Skip to content

25.6 智能维护

智能维护概述

Claude Code 可以自动化维护任务、监控系统健康状态、预测潜在问题,并提供维护优化建议。通过智能维护功能,开发者可以降低维护成本、提高系统可靠性。

自动化维护

1. 依赖管理

示例:自动化依赖管理

用户请求: "为项目实现自动化依赖管理"

Claude Code 生成的代码:

requirements.txt

bash

    Flask-SQLAlchemy==3.0.5
    Flask-JWT-Extended==4.5.2
    Flask-Mail==0.9.1
    Flask-Migrate==4.0.5
    gunicorn==21.2.0
    psycopg2-binary==2.9.7
    redis==4.6.0
    celery==5.3.4
    python-dotenv==1.0.0

    ## requirements-dev.txt

```python

    pytest==7.4.2
    pytest-cov==4.1.0
    pytest-mock==3.11.1
    black==23.9.1
    flake8==6.1.0
    mypy==1.5.1
    isort==5.12.0
    pre-commit==3.4.0

    ## 依赖更新脚本

    ````python
    `python

    #!/usr/bin/env python3
    """
    依赖更新脚本
    自动检查并更新过期的依赖包
    """

```python
    import subprocess
    import json
    from typing import List, Dict
    import sys

    def get_outdated_packages() -> List[Dict]:
"""获取过期的包"""
result = subprocess.run(
['pip', 'list', '--outdated', '--format=json'],
capture_output=True,
text=True
)
bash
    if result.returncode != 0:
    print("Error: Failed to get outdated packages")
    return []

    packages = json.loads(result.stdout)
    return packages

    def check_compatibility(package: str, version: str) -> bool:
"""检查包的兼容性"""
bash
    try:
    result = subprocess.run(
    ['pip', 'install', '--dry-run', f'{package}=={version}'],
    capture_output=True,
    text=True
    )

    return result.returncode == 0
    except Exception as e:
    print(f"Error checking compatibility for {package}: {e}")
    return False

    def update_package(package: str, version: str) -> bool:
"""更新包"""
bash
    try:
    result = subprocess.run(
    ['pip', 'install', '--upgrade', f'{package}=={version}'],
    capture_output=True,
    text=True
    )

    return result.returncode == 0
    except Exception as e:
    print(f"Error updating {package}: {e}")
    return False

    def update_requirements():
    """更新 requirements.txt"""
    try:
    result = subprocess.run(
    ['pip', 'freeze'],
    capture_output=True,
    text=True
    )

    with open('requirements.txt', 'w') as f:
    f.write(result.stdout)

    return True
    except Exception as e:
    print(f"Error updating requirements.txt: {e}")
    return False

    def main():
"""主函数"""
print("检查过期依赖...")

outdated_packages = get_outdated_packages()
bash
    if not outdated_packages:
print("所有依赖都是最新的!")
return

print(f"\n发现 {len(outdated_packages)} 个过期包:")
bash
    for pkg in outdated_packages:
    print(f"  - {pkg['name']}: {pkg['version']} -> {pkg['latest_version']}")

    print("\n检查兼容性...")
    compatible_packages = []

    for pkg in outdated_packages:
    package_name = pkg['name']
    latest_version = pkg['latest_version']

    if check_compatibility(package_name, latest_version):
    compatible_packages.append(pkg)
    print(f"  ✓ {package_name} {latest_version}")
    else:
    print(f"  ✗ {package_name} {latest_version} (不兼容)")

    if not compatible_packages:
print("\n没有可更新的包")
return

print(f"\n准备更新 {len(compatible_packages)} 个包...")
response = input("是否继续?(y/n): ")
bash
    if response.lower() != 'y':
print("取消更新")
return

print("\n开始更新...")
bash
    for pkg in compatible_packages:
    package_name = pkg['name']
    latest_version = pkg['latest_version']

    print(f"更新 {package_name} 到 {latest_version}...")
    if update_package(package_name, latest_version):
    print(f"  ✓ {package_name} 更新成功")
    else:
    print(f"  ✗ {package_name} 更新失败")

    print("\n更新 requirements.txt...")
    if update_requirements():
    print("  ✓ requirements.txt 更新成功")
    else:
    print("  ✗ requirements.txt 更新失败")
print("\n依赖更新完成!")
bash
    if __name__ == '__main__':
    main()
```## 依赖安全检查
python

    #!/usr/bin/env python3
    """
    依赖安全检查脚本
    检查依赖包的安全漏洞
    """

```python
    import subprocess
    import json
    from typing import List, Dict

    def check_security() -> List[Dict]:
    """检查依赖安全性"""
bash
        try:
            result = subprocess.run(
                ['pip', 'audit'],
                capture_output=True,
                text=True
            )

            if result.returncode != 0:
                print("Error: Failed to check security")
                return []

            return json.loads(result.stdout)
        except Exception as e:
            print(f"Error checking security: {e}")
            return []

    def main():
    """主函数"""
    print("检查依赖安全性...")

    vulnerabilities = check_security()
bash
        if not vulnerabilities:
        print("未发现安全漏洞!")
        return

    print(f"\n发现 {len(vulnerabilities)} 个安全漏洞:")
bash
        for vuln in vulnerabilities:
            print(f"\n  包: {vuln['name']}")
            print(f"  版本: {vuln['version']}")
            print(f"  漏洞: {vuln['advisory']}")
            print(f"  严重性: {vuln['severity']}")
            print(f"  修复版本: {vuln['fix_versions']}")

    if __name__ == '__main__':
        main()
### 2. 数据库维护
python

    # 示例:自动化数据库维护

    用户请求:
    "为应用实现自动化数据库维护"

    Claude Code 生成的代码:

    ```python

```python
    from datetime import datetime, timedelta
    from sqlalchemy import text
    from app import db
    import logging

    logger = logging.getLogger(__name__)

    class DatabaseMaintenance:
    """数据库维护类"""
python
        def __init__(self, db_session):
            self.db = db_session

        def analyze_tables(self):
        """分析表"""
        tables = ['users', 'products', 'orders', 'payments']
bash
            for table in tables:
                try:
                    result = self.db.execute(text(f"ANALYZE {table}"))
                    logger.info(f"Table {table} analyzed successfully")
                except Exception as e:
                    logger.error(f"Error analyzing table {table}: {e}")

        def vacuum_tables(self):
        """清理表"""
        tables = ['users', 'products', 'orders', 'payments']
bash
            for table in tables:
                try:
                    result = self.db.execute(text(f"VACUUM ANALYZE {table}"))
                    logger.info(f"Table {table} vacuumed successfully")
                except Exception as e:
                    logger.error(f"Error vacuuming table {table}: {e}")

        def rebuild_indexes(self):
        """重建索引"""
        indexes = [
            'idx_users_username',
            'idx_users_email',
            'idx_orders_user_id',
            'idx_orders_status',
            'idx_products_name'
        ]
bash
            for index in indexes:
                try:
                    result = self.db.execute(text(f"REINDEX INDEX {index}"))
                    logger.info(f"Index {index} rebuilt successfully")
                except Exception as e:
                    logger.error(f"Error rebuilding index {index}: {e}")

        def clean_old_logs(self, days=30):
        """清理旧日志"""
        cutoff_date = datetime.utcnow() - timedelta(days=days)
bash
            try:
                result = self.db.execute(
                    text("""
                        DELETE FROM logs
                        WHERE created_at < :cutoff_date
                    """),
                    {'cutoff_date': cutoff_date}
                )

                deleted_count = result.rowcount
                logger.info(f"Deleted {deleted_count} old log entries")

                self.db.commit()

                return deleted_count
            except Exception as e:
                logger.error(f"Error cleaning old logs: {e}")
                self.db.rollback()
                return 0

        def clean_old_sessions(self, days=7):
        """清理旧会话"""
        cutoff_date = datetime.utcnow() - timedelta(days=days)
bash
            try:
                result = self.db.execute(
                    text("""
                        DELETE FROM sessions
                        WHERE expires_at < :cutoff_date
                    """),
                    {'cutoff_date': cutoff_date}
                )

                deleted_count = result.rowcount
                logger.info(f"Deleted {deleted_count} old sessions")

                self.db.commit()

                return deleted_count
            except Exception as e:
                logger.error(f"Error cleaning old sessions: {e}")
                self.db.rollback()
                return 0

        def optimize_database(self):
        """优化数据库"""
bash
            try:
                self.analyze_tables()
                self.vacuum_tables()
                self.rebuild_indexes()
                logger.info("Database optimization completed successfully")
            except Exception as e:
                logger.error(f"Error optimizing database: {e}")

        def get_database_stats(self):
        """获取数据库统计信息"""
        stats = {}
bash
            try:
            # 表大小

            result = self.db.execute(text("""
                SELECT
                    schemaname,
                    tablename,
                    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
                FROM pg_tables
                WHERE schemaname = 'public'
                ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
            """))

            stats['table_sizes'] = [
                {
                    'schema': row[0],
                    'table': row[1],
                    'size': row[2]
                }
bash
                    for row in result
                ]
            # 索引使用情况

            result = self.db.execute(text("""
                SELECT
                    schemaname,
                    tablename,
                    indexname,
                    idx_scan,
                    idx_tup_read,
                    idx_tup_fetch
                FROM pg_stat_user_indexes
                ORDER BY idx_scan DESC
            """))

            stats['index_usage'] = [
                {
                    'schema': row[0],
                    'table': row[1],
                    'index': row[2],
                    'scans': row[3],
                    'tuples_read': row[4],
                    'tuples_fetched': row[5]
                }
bash
                    for row in result
                ]
            # 慢查询

            result = self.db.execute(text("""
                SELECT
                    query,
                    calls,
                    total_time,
                    mean_time,
                    max_time
                FROM pg_stat_statements
                ORDER BY mean_time DESC
                LIMIT 10
            """))

            stats['slow_queries'] = [
                {
                    'query': row[0],
                    'calls': row[1],
                    'total_time': row[2],
                    'mean_time': row[3],
                    'max_time': row[4]
                }
bash
                    for row in result
                ]

                return stats
            except Exception as e:
                logger.error(f"Error getting database stats: {e}")
                return stats

    def run_maintenance():
    """运行维护任务"""
    maintenance = DatabaseMaintenance(db.session)

    print("开始数据库维护...")

    # 优化数据库

    print("优化数据库...")
bash
        maintenance.optimize_database()
    # 清理旧数据

    print("清理旧日志...")
bash
        maintenance.clean_old_logs(days=30)

        print("清理旧会话...")
        maintenance.clean_old_sessions(days=7)
    # 获取统计信息

    print("获取数据库统计信息...")
    stats = maintenance.get_database_stats()

    print("\n数据库统计信息:")
    print(f"  表数量: {len(stats['table_sizes'])}")
    print(f"  索引数量: {len(stats['index_usage'])}")
    print(f"  慢查询数量: {len(stats['slow_queries'])}")

    print("\n数据库维护完成!")
bash
    if __name__ == '__main__':
        run_maintenance()
## 系统监控

### 1. 性能监控
python

    # 示例:实现性能监控

    用户请求:
    "为应用实现性能监控"

    Claude Code 生成的代码:

    ```python

```python
    import time
    import psutil
    from functools import wraps
    from flask import request, g
    from prometheus_client import Counter, Histogram, Gauge, generate_latest
    import logging

    logger = logging.getLogger(__name__)
# Prometheus 指标
request_count = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

request_duration = Histogram(
    'http_request_duration_seconds',
    'HTTP request duration',
    ['method', 'endpoint']
)

active_connections = Gauge(
    'active_connections',
    'Number of active connections'
)

memory_usage = Gauge(
    'memory_usage_bytes',
    'Memory usage in bytes'
)

cpu_usage = Gauge(
    'cpu_usage_percent',
    'CPU usage percentage'
)

disk_usage = Gauge(
    'disk_usage_percent',
    'Disk usage percentage'
)
python
    def track_request_metrics(f):
    """跟踪请求指标"""
    @wraps(f)
python
        def decorated_function(*args, **kwargs):
            start_time = time.time()
        # 记录请求开始
        g.start_time = start_time
bash
            try:
                response = f(*args, **kwargs)
                status_code = response.status_code if hasattr(response, 'status_code') else 200
            # 记录请求计数
bash
                request_count.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown',
                    status=status_code
                ).inc()
            # 记录请求持续时间
            duration = time.time() - start_time
bash
                request_duration.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown'
                ).observe(duration)

                return response
            except Exception as e:
            # 记录错误
bash
                request_count.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown',
                    status=500
                ).inc()
                raise

        return decorated_function

    def update_system_metrics():
    """更新系统指标"""
    # 内存使用
    memory = psutil.virtual_memory()
bash
        memory_usage.set(memory.used)
    # CPU 使用
bash
        cpu_usage.set(psutil.cpu_percent())
    # 磁盘使用
    disk = psutil.disk_usage('/')
bash
        disk_usage.set(disk.percent)
    # 活跃连接
bash
        active_connections.set(len(psutil.net_connections()))

    class PerformanceMonitor:
    """性能监控类"""
python
        def __init__(self, app):
            self.app = app
            self.metrics = {}

        def track_function(self, name):
        """跟踪函数性能"""
python
            def decorator(f):
                @wraps(f)
                def decorated_function(*args, **kwargs):
                    start_time = time.time()

                    try:
                        result = f(*args, **kwargs)
                        duration = time.time() - start_time

                        self.record_metric(name, duration, success=True)

                        return result
                    except Exception as e:
                        duration = time.time() - start_time

                        self.record_metric(name, duration, success=False)

                        logger.error(f"Error in {name}: {e}")
                        raise

                return decorated_function
            return decorator

        def record_metric(self, name, duration, success=True):
        """记录指标"""
python
            if name not in self.metrics:
                self.metrics[name] = {
                    'count': 0,
                    'total_duration': 0,
                    'success_count': 0,
                    'error_count': 0,
                    'min_duration': float('inf'),
                    'max_duration': 0
                }

            metric = self.metrics[name]
            metric['count'] += 1
            metric['total_duration'] += duration

            if success:
                metric['success_count'] += 1
            else:
                metric['error_count'] += 1

            metric['min_duration'] = min(metric['min_duration'], duration)
            metric['max_duration'] = max(metric['max_duration'], duration)

        def get_metrics(self):
        """获取指标"""
python
            for name, metric in self.metrics.items():
                if metric['count'] > 0:
                    metric['avg_duration'] = metric['total_duration'] / metric['count']
                    metric['success_rate'] = metric['success_count'] / metric['count']

            return self.metrics

        def get_slow_functions(self, threshold=1.0):
        """获取慢函数"""
        slow_functions = []
python
            for name, metric in self.metrics.items():
                if metric['count'] > 0:
                    avg_duration = metric['total_duration'] / metric['count']
                    if avg_duration > threshold:
                        slow_functions.append({
                            'name': name,
                            'avg_duration': avg_duration,
                            'count': metric['count'],
                            'max_duration': metric['max_duration']
                        })

            return sorted(slow_functions, key=lambda x: x['avg_duration'], reverse=True)

    def setup_monitoring(app):
    """设置监控"""

    @app.route('/metrics')
python
        def metrics():
            """Prometheus 指标端点"""
            return generate_latest()

        @app.before_request
        def before_request():
        """请求前处理"""
        g.start_time = time.time()
bash
            active_connections.inc()

        @app.after_request
        def after_request(response):
        """请求后处理"""
bash
            if hasattr(g, 'start_time'):
                duration = time.time() - g.start_time

                request_count.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown',
                    status=response.status_code
                ).inc()

                request_duration.labels(
                    method=request.method,
                    endpoint=request.endpoint or 'unknown'
                ).observe(duration)

            active_connections.dec()

            return response
    # 定期更新系统指标
python
        def update_metrics():
            while True:
                update_system_metrics()
                time.sleep(5)

        import threading
        thread = threading.Thread(target=update_metrics, daemon=True)
        thread.start()

        return app
### 2. 错误监控

````python

````python

# 示例:实现错误监控

用户请求:
"为应用实现错误监控"

Claude Code 生成的代码:

```python

import logging
import traceback
from datetime import datetime
from typing import Dict, List
from flask import request, g
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration

logger = logging.getLogger(__name__)

class ErrorMonitor:
    """错误监控类"""

    def __init__(self, app):
        self.app = app
        self.errors = []
        self.error_stats = {}

    def capture_exception(self, exception, context=None):
        """捕获异常"""
        error_data = {
            'type': type(exception).__name__,
            'message': str(exception),
            'traceback': traceback.format_exc(),
            'timestamp': datetime.utcnow().isoformat(),
            'context': context or {}
        }

        # 添加请求信息
        if request:
            error_data['request'] = {
                'method': request.method,
                'path': request.path,
                'url': request.url,
                'ip': request.remote_addr,
                'user_agent': request.user_agent.string
            }

        # 添加用户信息
        if hasattr(g, 'user_id'):
            error_data['user_id'] = g.user_id

        self.errors.append(error_data)

        # 更新统计
        error_type = error_data['type']
        if error_type not in self.error_stats:
            self.error_stats[error_type] = {
                'count': 0,
                'last_occurrence': None
            }

        self.error_stats[error_type]['count'] += 1
        self.error_stats[error_type]['last_occurrence'] = error_data['timestamp']

        # 记录日志
        logger.error(
            f"Exception captured: {error_type}",
            extra=error_data
        )

        # 发送到 Sentry
        sentry_sdk.capture_exception(exception)

    def get_errors(self, limit=100):
        """获取错误列表"""
        return self.errors[-limit:]

    def get_error_stats(self):
        """获取错误统计"""
        return self.error_stats

    def get_frequent_errors(self, threshold=10):
        """获取频繁错误"""
        frequent_errors = []

        for error_type, stats in self.error_stats.items():
            if stats['count'] >= threshold:
                frequent_errors.append({
                    'type': error_type,
                    'count': stats['count'],
                    'last_occurrence': stats['last_occurrence']
                })

        return sorted(frequent_errors, key=lambda x: x['count'], reverse=True)

    def clear_errors(self):
        """清除错误"""
        self.errors = []
        self.error_stats = {}

def setup_error_monitoring(app, sentry_dsn):
    """设置错误监控"""

    # 初始化 Sentry
    sentry_sdk.init(
        dsn=sentry_dsn,
        integrations=[FlaskIntegration()],
        traces_sample_rate=1.0,
        profiles_sample_rate=1.0
    )

    error_monitor = ErrorMonitor(app)

    @app.errorhandler(Exception)
    def handle_exception(e):
        """处理异常"""
        error_monitor.capture_exception(e)

        if request.is_json:
            return {'error': str(e)}, 500
        else:
            return str(e), 500

    @app.errorhandler(404)
    def handle_not_found(e):
        """处理 404"""
        logger.warning(f"404 Not Found: {request.path}")
        return {'error': 'Not found'}, 404

    @app.errorhandler(500)
    def handle_server_error(e):
        """处理 500"""
        error_monitor.capture_exception(e)
        return {'error': 'Internal server error'}, 500

    @app.route('/admin/errors')
    def get_errors():
        """获取错误列表"""
        errors = error_monitor.get_errors()
        return {'errors': errors}

    @app.route('/admin/errors/stats')
    def get_error_stats():
        """获取错误统计"""
        stats = error_monitor.get_error_stats()
        return {'stats': stats}

    @app.route('/admin/errors/frequent')
    def get_frequent_errors():
        """获取频繁错误"""
        frequent_errors = error_monitor.get_frequent_errors()
        return {'frequent_errors': frequent_errors}

    return app

```## 预测性维护

### 1. 容量预测

# 示例:实现容量预测

用户请求:
"为应用实现容量预测"
Claude Code 生成的代码:

````python
`python
python
    import numpy as np
    from datetime import datetime, timedelta
    from typing import List, Dict
    import logging

    logger = logging.getLogger(__name__)

    class CapacityPredictor:
"""容量预测器"""
python
    def __init__(self):
    self.history = []
    self.predictions = {}

    def add_metric(self, timestamp: datetime, metric_name: str, value: float):
"""添加指标"""
python
    self.history.append({
    'timestamp': timestamp,
    'metric': metric_name,
    'value': value
    })

    def predict_capacity(self, metric_name: str, days: int = 7) -> List[Dict]:
"""预测容量"""
 # 获取历史数据
data = [
entry for entry in self.history
bash
    if entry['metric'] == metric_name
    ]

    if len(data) < 30:
    logger.warning(f"Insufficient data for {metric_name}")
    return []
 # 提取值
values = [entry['value'] for entry in data]

 # 计算趋势
trend = self._calculate_trend(values)

 # 预测未来值
predictions = []
bash
    for i in range(days):
    predicted_value = values[-1] + trend * (i + 1)
    predicted_date = datetime.utcnow() + timedelta(days=i + 1)

    predictions.append({
    'date': predicted_date.isoformat(),
    'value': predicted_value,
    'metric': metric_name
    })

    self.predictions[metric_name] = predictions

    return predictions

    def _calculate_trend(self, values: List[float]) -> float:
"""计算趋势"""
bash
    if len(values) < 2:
    return 0
 # 使用线性回归
x = np.arange(len(values))
y = np.array(values)

 # 计算斜率
slope = np.polyfit(x, y, 1)[0]
bash
    return slope

    def check_capacity_alerts(self, threshold: float = 0.9) -> List[Dict]:
"""检查容量告警"""
alerts = []
python
    for metric_name, predictions in self.predictions.items():
    for prediction in predictions:
    if prediction['value'] >= threshold:
    alerts.append({
    'metric': metric_name,
    'date': prediction['date'],
    'value': prediction['value'],
    'threshold': threshold
    })

    return sorted(alerts, key=lambda x: x['value'], reverse=True)

    def get_capacity_recommendations(self) -> List[Dict]:
"""获取容量建议"""
recommendations = []

alerts = self.check_capacity_alerts()
bash
    if alerts:
    recommendations.append({
    'type': 'scale_up',
    'message': f"发现 {len(alerts)} 个容量告警,建议扩容",
    'alerts': alerts
    })
 # 检查资源利用率
python
    for metric_name, predictions in self.predictions.items():
    avg_value = np.mean([p['value'] for p in predictions])

    if avg_value < 0.3:
    recommendations.append({
    'type': 'scale_down',
    'message': f"{metric_name} 平均利用率较低,建议缩容",
    'metric': metric_name,
    'avg_value': avg_value
    })

    return recommendations

    def run_capacity_prediction():
"""运行容量预测"""
predictor = CapacityPredictor()

 # 添加历史数据(示例)
now = datetime.utcnow()
bash
    for i in range(30):
    timestamp = now - timedelta(days=30 - i)
    value = 0.5 + (i / 100) + np.random.normal(0, 0.05)
    predictor.add_metric(timestamp, 'cpu_usage', value)
 # 预测容量
print("预测 CPU 使用率...")
predictions = predictor.predict_capacity('cpu_usage', days=7)

print("\n预测结果:")
bash
    for prediction in predictions:
    print(f"  {prediction['date']}: {prediction['value']:.2%}")
 # 检查告警
print("\n检查容量告警...")
alerts = predictor.check_capacity_alerts(threshold=0.9)
bash
    if alerts:
    print(f"发现 {len(alerts)} 个告警:")
    for alert in alerts:
    print(f"  {alert['date']}: {alert['value']:.2%} (阈值: {alert['threshold']:.0%})")
    else:
print("未发现容量告警")

 # 获取建议
print("\n获取容量建议...")
recommendations = predictor.get_capacity_recommendations()
bash
    for rec in recommendations:
    print(f"  {rec['type']}: {rec['message']}")

    if __name__ == '__main__':
    run_capacity_prediction()

基于 MIT 许可发布