Skip to content

19.4 性能与安全对比

19.4.1 性能对比

1. 执行性能

Skills 执行性能

Skills 的执行性能主要受限于 LLM 的推理时间和网络延迟。

性能特征

python
    python

    # Skills 执行时间分解
    class SkillPerformanceMetrics:
        def __init__(self):
            self.metrics = {
                "intent_parsing": 0,      # 意图解析时间
                "parameter_extraction": 0,  # 参数提取时间
                "skill_execution": 0,       # 技能执行时间
                "result_generation": 0,     # 结果生成时间
                "total_time": 0            # 总时间
            }

        def measure_execution(self, skill, user_input):
            """测量执行时间"""
            start_time = time.time()

            # 意图解析
            intent_start = time.time()
            intent = self.parse_intent(user_input)
            self.metrics["intent_parsing"] = time.time() - intent_start

            # 参数提取
            param_start = time.time()
            parameters = self.extract_parameters(user_input, intent)
            self.metrics["parameter_extraction"] = time.time() - param_start

            # 技能执行
            exec_start = time.time()
            result = skill.execute(parameters)
            self.metrics["skill_execution"] = time.time() - exec_start

            # 结果生成
            gen_start = time.time()
            formatted_result = self.format_result(result)
            self.metrics["result_generation"] = time.time() - gen_start

            self.metrics["total_time"] = time.time() - start_time

            return formatted_result

典型执行时间(毫秒)

意图解析:300-500ms

参数提取:200-400ms

技能执行:100-300ms

结果生成:400-600ms

总时间:1000-1800ms

性能优化策略

python
    python

    class OptimizedSkill(Skill):
        def __init__(self):
            super().__init__()
            self.cache = {}  # 缓存常用请求
            self.intent_classifier = self.load_intent_classifier()  # 预训练分类器

        def parse_intent(self, user_input):
            """优化意图解析"""
            # 1. 检查缓存
            cache_key = self.get_cache_key(user_input)
            if cache_key in self.cache:
                return self.cache[cache_key]

            # 2. 使用预训练分类器(快速)
            intent = self.intent_classifier.predict(user_input)

            # 3. 缓存结果
            self.cache[cache_key] = intent

            return intent

        def execute(self, parameters, context):
            """优化执行"""
            # 批处理请求
            if isinstance(parameters, list):
                return self.batch_execute(parameters, context)

            # 单个请求执行
            return super().execute(parameters, context)

        def batch_execute(self, parameters_list, context):
            """批量执行"""
            results = []
            for parameters in parameters_list:
                result = super().execute(parameters, context)
                results.append(result)
            return results

插件的执行性能主要取决于代码效率和系统资源。

性能特征

插件执行时间分解

python
class PluginPerformanceMetrics: def **init**(self): self.metrics = { "parameter_validation": 0, # 参数验证时间 "method_execution": 0, # 方法执行时间 "result_processing": 0, # 结果处理时间 "total_time": 0 # 总时间 } def measure_execution(self, plugin, method_name, parameters): """测量执行时间""" start_time = time.time()

参数验证

valid_start = time.time() self.validate_parameters(plugin, method_name, parameters) self.metrics["parameter_validation"] = time.time() - valid_start

方法执行

exec_start = time.time() result = getattr(plugin, method_name)(**parameters) self.metrics["method_execution"] = time.time() - exec_start

结果处理

proc_start = time.time() formatted_result = self.process_result(result) self.metrics["result_processing"] = time.time() - proc_start self.metrics["total_time"] = time.time() - start_time return formatted_result

典型执行时间(毫秒)

参数验证:5-15ms

方法执行:20-100ms

结果处理:5-10ms

总时间:30-125ms

bash

    **性能优化策略**:

```python
```python
    class OptimizedPlugin(Plugin):
        def __init__(self):
            super().__init__()
            self.cache = LRUCache(maxsize=1000)  # LRU 缓存
            self.connection_pool = ConnectionPool(size=10)  # 连接池
            self.thread_pool = ThreadPoolExecutor(max_workers=4)  # 线程池

        def execute(self, method_name, parameters):
        """优化执行"""
        # 1. 检查缓存
        cache_key = self.get_cache_key(method_name, parameters)
        cached_result = self.cache.get(cache_key)
bash
            if cached_result:
                return cached_result
        # 2. 使用连接池
python
            with self.connection_pool.get_connection() as conn:
            # 3. 异步执行
            future = self.thread_pool.submit(
python
                    self._execute_method,
                    method_name,
                    parameters,
                    conn
                )
                result = future.result(timeout=10)
        # 4. 缓存结果
python
            self.cache.set(cache_key, result)

            return result

        def _execute_method(self, method_name, parameters, conn):
        """执行方法"""
        method = getattr(self, method_name)
bash
            return method(**parameters, connection=conn)
### 2. 并发性能

#### Skills 并发性能

Skills 的并发性能受限于 LLM API 的并发限制。
python
    class SkillConcurrencyManager:
    def __init__(self, max_concurrent=5):
    self.semaphore = asyncio.Semaphore(max_concurrent)
    self.request_queue = asyncio.Queue()
    async def execute_skill(self, skill, user_input):
"""异步执行技能"""
python
    async with self.semaphore:
    return await skill.execute_async(user_input)
    async def batch_execute(self, skill, user_inputs):
"""批量执行"""
tasks = [
python
    self.execute_skill(skill, user_input)
    for user_input in user_inputs
    ]
    return await asyncio.gather(*tasks)

## 性能指标

## 单个请求:1000-1800ms

## 并发 5 个:1200-2000ms(每个)

## 并发 10 个:1500-2500ms(每个)

## 并发 20 个:2000-3500ms(每个)

```python
    #### 插件并发性能
    插件的并发性能主要取决于系统资源和代码设计。

    ```python

    class PluginConcurrencyManager:
        def __init__(self, max_workers=10):
            self.executor = ThreadPoolExecutor(max_workers=max_workers)
            self.async_executor = ProcessPoolExecutor(max_workers=max_workers)

        def execute_plugin(self, plugin, method_name, parameters):
            """同步执行"""
            future = self.executor.submit(
                self._execute_sync,
                plugin,
                method_name,
                parameters
            )
            return future.result(timeout=30)

        async def execute_plugin_async(self, plugin, method_name, parameters):
            """异步执行"""
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                self.executor,
                self._execute_sync,
                plugin,
                method_name,
                parameters
            )

        def batch_execute(self, plugin, method_name, parameters_list):
            """批量执行"""
            futures = [
                self.executor.submit(
                    self._execute_sync,
                    plugin,
                    method_name,
                    parameters
                )
                for parameters in parameters_list
            ]
            return [future.result() for future in futures]

性能指标

单个请求:30-125ms

并发 10 个:40-150ms(每个)

并发 50 个:60-200ms(每个)

并发 100 个:100-300ms(每个)

3. 资源消耗

Skills 资源消耗

python
class SkillResourceMonitor: def **init**(self): self.metrics = { "cpu_usage": [], # CPU 使用率 "memory_usage": [], # 内存使用 "network_io": [], # 网络 I/O "api_calls": 0 # API 调用次数 } def monitor_execution(self, skill, user_input): """监控执行过程""" import psutil

记录初始状态

process = psutil.Process() initial_cpu = process.cpu_percent() initial_memory = process.memory_info().rss

执行技能

result = skill.execute(user_input)

记录最终状态

final_cpu = process.cpu_percent() final_memory = process.memory_info().rss

计算资源消耗

python
self.metrics["cpu_usage"].append(final_cpu - initial_cpu) self.metrics["memory_usage"].append(final_memory - initial_memory) self.metrics["api_calls"] += 1 return result

典型资源消耗

CPU 使用率:10-30%

内存使用:100-500MB

网络 I/O:1-5MB/请求

API 调用:2-4 次/请求

bash

    #### 插件资源消耗

```python
```python
    class PluginResourceMonitor:
        def __init__(self):
            self.metrics = {
                "cpu_usage": [],      # CPU 使用率
                "memory_usage": [],    # 内存使用
                "disk_io": [],        # 磁盘 I/O
                "network_io": []      # 网络 I/O
            }

        def monitor_execution(self, plugin, method_name, parameters):
        """监控执行过程"""
python
            import psutil
        # 记录初始状态
        process = psutil.Process()
        initial_cpu = process.cpu_percent()
        initial_memory = process.memory_info().rss
        initial_disk_io = process.io_counters()
        initial_network_io = psutil.net_io_counters()

        # 执行插件方法
        result = getattr(plugin, method_name)(**parameters)

        # 记录最终状态
        final_cpu = process.cpu_percent()
        final_memory = process.memory_info().rss
        final_disk_io = process.io_counters()
        final_network_io = psutil.net_io_counters()

        # 计算资源消耗
python
            self.metrics["cpu_usage"].append(final_cpu - initial_cpu)
            self.metrics["memory_usage"].append(final_memory - initial_memory)
            self.metrics["disk_io"].append(
                final_disk_io.write_bytes - initial_disk_io.write_bytes
            )
            self.metrics["network_io"].append(
                final_network_io.bytes_sent - initial_network_io.bytes_sent
            )

            return result

## 典型资源消耗

## CPU 使用率:5-15%

## 内存使用:50-200MB

## 磁盘 I/O:100KB-10MB/请求

## 网络 I/O:100KB-1MB/请求

### 4\. 性能对比表

性能指标| Skills| 插件
---|---|---
单次执行时间| 1000-1800ms| 30-125ms
并发能力| 低(5-10)| 高(50-100+)
CPU 使用率| 10-30%| 5-15%
内存使用| 100-500MB| 50-200MB
网络 I/O| 1-5MB/请求| 100KB-1MB/请求
可扩展性| 中| 高
吞吐量| 低| 高
延迟| 高| 低

## 19.4.2 安全对比

### 1\. 输入安全

#### Skills 输入安全

Skills 需要防范提示注入攻击和敏感信息泄露。

class SkillInputSecurity: def **init**(self): self.injection_patterns = [ r"ignore.*instructions", r"forget.*previous", r"new.*instructions", r"override.*system" ] self.sensitive_keywords = [ "password", "token", "api_key", "secret", "credential" ] def sanitize_input(self, user_input): """清理输入""" import re

## 1\. 检测注入攻击

for pattern in self.injection_patterns: if re.search(pattern, user_input, re.IGNORECASE): raise SecurityError("检测到潜在的注入攻击")

## 2\. 移除敏感信息

sanitized = user_input for keyword in self.sensitive_keywords:

## 使用占位符替换敏感信息

pattern = rf"{keyword}\s*[:=]\s*[^\s]+" sanitized = re.sub(pattern, f"{keyword}=_**REDACTED**_ ", sanitized, flags=re.IGNORECASE)

## 3\. 限制输入长度

max_length = 10000 if len(sanitized) > max_length: sanitized = sanitized[:max_length] return sanitized def validate_input(self, user_input): """验证输入"""

## 检查恶意代码

if self.contains_malicious_code(user_input): raise SecurityError("输入包含恶意代码")

## 检查特殊字符

if self.contains_dangerous_characters(user_input): raise SecurityError("输入包含危险字符") return True

```bash

    #### 插件输入安全
    插件通过类型系统和验证规则确保输入安全。

```python
    from pydantic import BaseModel, validator, constr
    from typing import Optional

    class SecureInput(BaseModel):
        """安全输入模型"""
        username: constr(min_length=3, max_length=50)
        email: str
        message: constr(max_length=1000)

        @validator('username')
        def username_alphanumeric(cls, v):
            if not v.isalnum():
                raise ValueError('用户名只能包含字母和数字')
            return v

        @validator('email')
        def email_valid(cls, v):
            import re
            if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
                raise ValueError('邮箱格式无效')
            return v

        @validator('message')
        def message_safe(cls, v):
            # 检查 XSS 攻击
            dangerous_patterns = ['<script>', 'javascript:', 'onerror=']
            for pattern in dangerous_patterns:
                if pattern.lower() in v.lower():
                    raise ValueError('消息包含潜在危险内容')
            return v

    class PluginInputSecurity:
        def __init__(self):
            self.input_model = SecureInput

        def validate_input(self, parameters):
            """验证输入"""
            try:
                validated = self.input_model(**parameters)
                return validated.dict()
            except Exception as e:
                raise SecurityError(f"输入验证失败: {str(e)}")

    ### 2. 输出安全

    #### Skills 输出安全

    Skills 需要防止输出敏感信息和有害内容。

    class SkillOutputSecurity:
    def __init__(self):
    self.sensitive_patterns = [
    r"password\s*[:=]\s*\S+",
    r"token\s*[:=]\s*\S+",
    r"api[_-]?key\s*[:=]\s*\S+"
    ]
    self.harmful_categories = [
    "violence", "hate", "sexual",
    "self-harm", "illegal"
    ]
    def sanitize_output(self, output):
    """清理输出"""
    import re
    # 1. 移除敏感信息
    sanitized = output
    for pattern in self.sensitive_patterns:
    sanitized = re.sub(pattern, "***REDACTED***", sanitized, flags=re.IGNORECASE)
    # 2. 检查有害内容
    if self.contains_harmful_content(sanitized):
    sanitized = self.add_warning(sanitized, "内容可能包含有害信息")
    # 3. 限制输出长度
    max_length = 50000
    if len(sanitized) > max_length:
    sanitized = sanitized[:max_length] + "\n\n[输出已截断]"
    return sanitized
    def contains_harmful_content(self, text):
    """检查有害内容"""

使用内容审核 API

这里简化实现

harmful_keywords = ["暴力", "仇恨", "非法"] for keyword in harmful_keywords: if keyword in text: return True return False def add_warning(self, text, warning): """添加警告""" return f"[警告: {warning}]\n\n{text}"

bash

    #### 插件输出安全
    插件通过数据过滤和编码确保输出安全。

```python
```python
    class PluginOutputSecurity:
        def __init__(self):
            self.html_escape_map = {
                '&': '&amp;',
                '<': '&lt;',
                '>': '&gt;',
                '"': '&quot;',
                "'": '&#x27;'
            }

        def sanitize_output(self, output):
        """清理输出"""
        # 1. HTML 转义
bash
            if isinstance(output, str):
                output = self.html_escape(output)
        # 2. JSON 编码
bash
            if isinstance(output, dict):
                output = self.json_encode(output)
        # 3. 移除敏感字段
bash
            if isinstance(output, dict):
                output = self.remove_sensitive_fields(output)

            return output

        def html_escape(self, text):
            """HTML 转义"""
            result = []
            for char in text:
                result.append(self.html_escape_map.get(char, char))
            return ''.join(result)

        def json_encode(self, data):
            """JSON 编码"""
            import json
            return json.dumps(data, ensure_ascii=False)

        def remove_sensitive_fields(self, data):
        """移除敏感字段"""
        sensitive_fields = ['password', 'token', 'secret']
bash
            return {
                k: v for k, v in data.items()
                if k not in sensitive_fields
            }
### 3. 访问控制

#### Skills 访问控制

Skills 的访问控制相对宽松,主要依赖 LLM 的理解能力。
python
    class SkillAccessControl:
    def __init__(self):
    self.permissions = {
    "admin": ["all"],
    "user": ["read", "write"],
    "guest": ["read"]
    }
    def check_permission(self, user, skill_name, action):
"""检查权限"""
user_role = user.get("role", "guest")
user_permissions = self.permissions.get(user_role, [])
bash
    if "all" in user_permissions:
    return True
    if action in user_permissions:
    return True
    raise PermissionError(f"用户 {user['username']} 无权限执行 {action}")
    def filter_sensitive_data(self, user, data):
"""过滤敏感数据"""
user_role = user.get("role", "guest")
bash
    if user_role == "admin":
    return data
# 移除敏感字段
filtered = data.copy()
sensitive_fields = ["internal_notes", "admin_only"]
bash
    for field in sensitive_fields:
    filtered.pop(field, None)
    return filtered

#### 插件访问控制

插件可以实现细粒度的访问控制。

```python

```python
    from functools import wraps

    class PluginAccessControl:
        def __init__(self):
            self.roles = {
                "admin": {"priority": 100},
                "user": {"priority": 50},
                "guest": {"priority": 10}
            }
            self.permissions = {}

        def require_permission(self, permission):
        """权限装饰器"""
python
            def decorator(func):
                @wraps(func)
                def wrapper(self, *args, **kwargs):
                    user = kwargs.get('user', None)
                    if not user:
                        raise PermissionError("需要用户认证")

                    if not self.has_permission(user, permission):
                        raise PermissionError(f"缺少权限: {permission}")

                    return func(self, *args, **kwargs)
                return wrapper
            return decorator

        def has_permission(self, user, permission):
        """检查权限"""
        user_role = user.get("role", "guest")
        role_info = self.roles.get(user_role, {})
        user_permissions = self.permissions.get(user_role, set())
bash
            return permission in user_permissions or "all" in user_permissions

        def add_permission(self, role, permission):
        """添加权限"""
python
            if role not in self.permissions:
                self.permissions[role] = set()
            self.permissions[role].add(permission)
# 使用示例
python
    class SecurePlugin(Plugin):
        def __init__(self):
            super().__init__()
            self.access_control = PluginAccessControl()

        @PluginAccessControl.require_permission("read")
        def read_data(self, user, data_id):
        """读取数据"""
python
            return self._read_data(data_id)

        @PluginAccessControl.require_permission("write")
        def write_data(self, user, data_id, data):
        """写入数据"""
python
            return self._write_data(data_id, data)
### 4. 审计日志

#### Skills 审计日志
python
    class SkillAuditLogger:
    def __init__(self):
    self.logs = []
    def log_execution(self, skill_name, user_input, result, user):
"""记录执行"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"skill_name": skill_name,
"user": user.get("username", "anonymous"),
"input": self.sanitize_input(user_input),
"output": self.sanitize_output(result),
"success": result.get("success", False),
"duration": result.get("duration", 0)
}
python
    self.logs.append(log_entry)
    def get_logs(self, user=None, skill_name=None, start_time=None, end_time=None):
"""获取日志"""
filtered_logs = self.logs
bash
    if user:
    filtered_logs = [log for log in filtered_logs if log["user"] == user]
    if skill_name:
    filtered_logs = [log for log in filtered_logs if log["skill_name"] == skill_name]
    if start_time:
    filtered_logs = [log for log in filtered_logs if log["timestamp"] >= start_time]
    if end_time:
    filtered_logs = [log for log in filtered_logs if log["timestamp"] <= end_time]
    return filtered_logs

#### 插件审计日志

```python

```python
    class PluginAuditLogger:
        def __init__(self):
            self.logs = []
            self.max_logs = 10000

        def log_execution(self, plugin_name, method_name, parameters, result, user):
        """记录执行"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "plugin_name": plugin_name,
            "method_name": method_name,
            "user": user.get("username", "anonymous"),
            "parameters": self.sanitize_parameters(parameters),
            "result": self.sanitize_result(result),
            "success": result.get("success", False),
            "duration": result.get("duration", 0),
            "ip_address": user.get("ip_address", "unknown")
        }
python
            self.logs.append(log_entry)
        # 限制日志数量
python
            if len(self.logs) > self.max_logs:
                self.logs = self.logs[-self.max_logs:]

        def export_logs(self, format="json"):
        """导出日志"""
bash
            if format == "json":
                import json
                return json.dumps(self.logs, indent=2)
            elif format == "csv":
                import csv
                import io
                output = io.StringIO()
                writer = csv.DictWriter(output, fieldnames=self.logs[0].keys())
                writer.writeheader()
                writer.writerows(self.logs)
                return output.getvalue()

### 5\. 安全对比表

安全特性| Skills| 插件
---|---|---
输入验证| 弱| 强
输出过滤| 中| 强
访问控制| 弱| 强
审计日志| 中| 强
类型安全| 无| 强
注入防护| 弱| 强
敏感信息保护| 中| 强
代码审查| 困难| 容易
安全测试| 困难| 容易
合规性| 中| 强

基于 MIT 许可发布