Skip to content

27.2 复杂系统设计与实现

复杂系统设计概述

Claude Code 可以帮助开发者设计和实现复杂的软件系统。通过系统化的设计方法和分阶段的实现策略,开发者可以利用 Claude Code 构建可扩展、可维护的复杂系统。

系统架构设计

1. 微服务架构

示例:设计微服务架构

用户请求: "设计一个电商系统的微服务架构"

Claude Code 生成的架构设计:

系统架构

bash

                        API Gateway
  (负载均衡、认证、路由、限流、监控)                           │
    └─────────────────────────────────────────────────────────────┘

    ┌─────────────────────┼─────────────────────┐

    ┌───────▼────────┐  ┌────────▼────────┐  ┌────────▼────────┐
  User Service Product Service Order Service
  - 用户管理  - 商品管理  - 订单管理
  - 认证授权  - 库存管理  - 支付处理
  - 个人资料  - 分类管理  - 物流跟踪
    └────────────────┘  └─────────────────┘  └─────────────────┘

    └─────────────────────┼─────────────────────┘

    ┌─────────────────────┼─────────────────────┐

    ┌───────▼────────┐  ┌────────▼────────┐  ┌────────▼────────┐
 Payment Service│ Inventory Notification
  - 支付处理 Service Service
  - 退款处理  - 库存同步  - 邮件通知
  - 交易记录  - 预警管理  - 短信通知
    └────────────────┘  └─────────────────┘  └─────────────────┘

    ┌─────────▼─────────┐
   Message Broker
  (Kafka/RabbitMQ) │
    └───────────────────┘

    ## 服务定义

```python
    ### User Service (用户服务)

```python

    # services/user_service/app.py

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from flask_jwt_extended import JWTManager, create_access_token
    import bcrypt

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/users_db'
    app.config['JWT_SECRET_KEY'] = 'your-secret-key'

    db = SQLAlchemy(app)
    jwt = JWTManager(app)

    class User(db.Model):
        """用户模型"""
        __tablename__ = 'users'

        id = db.Column(db.Integer, primary_key=True)
        email = db.Column(db.String(120), unique=True, nullable=False)
        password_hash = db.Column(db.String(255), nullable=False)
        name = db.Column(db.String(100), nullable=False)
        phone = db.Column(db.String(20))
        created_at = db.Column(db.DateTime, server_default=db.func.now())

        def set_password(self, password):
            """设置密码"""
            self.password_hash = bcrypt.hashpw(
                password.encode('utf-8'),
                bcrypt.gensalt()
            ).decode('utf-8')

        def check_password(self, password):
            """验证密码"""
            return bcrypt.checkpw(
                password.encode('utf-8'),
                self.password_hash.encode('utf-8')
            )

    @app.route('/register', methods=['POST'])
    def register():
        """用户注册"""
        data = request.get_json()

        user = User(
            email=data['email'],
            name=data['name'],
            phone=data.get('phone')
        )
        user.set_password(data['password'])

        db.session.add(user)
        db.session.commit()

        return jsonify({'user_id': user.id}), 201

    @app.route('/login', methods=['POST'])
    def login():
        """用户登录"""
        data = request.get_json()

        user = User.query.filter_by(email=data['email']).first()

        if not user or not user.check_password(data['password']):
            return jsonify({'error': 'Invalid credentials'}), 401

        access_token = create_access_token(identity=user.id)

        return jsonify({
            'access_token': access_token,
            'user': {
                'id': user.id,
                'email': user.email,
                'name': user.name
            }
        })

    @app.route('/users/<int:user_id>', methods=['GET'])
    def get_user(user_id):
        """获取用户信息"""
        user = User.query.get_or_404(user_id)
        return jsonify({
            'id': user.id,
            'email': user.email,
            'name': user.name,
            'phone': user.phone
        })

    ### Product Service (商品服务)

    ```python

    # services/product_service/app.py
```python
    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from flask_migrate import Migrate
    from datetime import datetime

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/products_db'

    db = SQLAlchemy(app)
    migrate = Migrate(app, db)

    class Product(db.Model):
"""商品模型"""
__tablename__ = 'products'

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
stock = db.Column(db.Integer, default=0)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

category = db.relationship('Category', backref='products')
python
    class Category(db.Model):
"""分类模型"""
__tablename__ = 'categories'

id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
parent_id = db.Column(db.Integer, db.ForeignKey('categories.id'))

parent = db.relationship('Category', remote_side=[id], backref='children')

@app.route('/products', methods=['POST'])
python
    def create_product():
"""创建商品"""
data = request.get_json()

product = Product(
name=data['name'],
description=data.get('description'),
price=data['price'],
stock=data.get('stock', 0),
category_id=data.get('category_id')
)

db.session.add(product)
db.session.commit()
bash
    return jsonify({'product_id': product.id}), 201

    @app.route('/products/<int:product_id>', methods=['GET'])
    def get_product(product_id):
"""获取商品详情"""
product = Product.query.get_or_404(product_id)
bash
    return jsonify({
    'id': product.id,
    'name': product.name,
    'description': product.description,
    'price': product.price,
    'stock': product.stock,
    'category': product.category.name if product.category else None
    })

    @app.route('/products/<int:product_id>/stock', methods=['PUT'])
    def update_stock(product_id):
"""更新库存"""
data = request.get_json()

product = Product.query.get_or_404(product_id)
product.stock = data['stock']

db.session.commit()
bash
    return jsonify({'stock': product.stock})
### Order Service (订单服务)
python

    # services/order_service/app.py

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from datetime import datetime
    import requests

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'

    db = SQLAlchemy(app)

    class Order(db.Model):
        """订单模型"""
        __tablename__ = 'orders'

        id = db.Column(db.Integer, primary_key=True)
        user_id = db.Column(db.Integer, nullable=False)
        total_amount = db.Column(db.Float, nullable=False)
        status = db.Column(db.String(20), default='pending')
        created_at = db.Column(db.DateTime, server_default=db.func.now())
        updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())

    class OrderItem(db.Model):
        """订单项模型"""
        __tablename__ = 'order_items'

        id = db.Column(db.Integer, primary_key=True)
        order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
        product_id = db.Column(db.Integer, nullable=False)
        quantity = db.Column(db.Integer, nullable=False)
        price = db.Column(db.Float, nullable=False)

        order = db.relationship('Order', backref='items')

    @app.route('/orders', methods=['POST'])
    def create_order():
        """创建订单"""
        data = request.get_json()

        # 验证商品库存

        for item in data['items']:
            response = requests.get(
                f"http://product-service:5001/products/{item['product_id']}"
            )
            product = response.json()

            if product['stock'] < item['quantity']:
                return jsonify({
                    'error': f'Insufficient stock for product {item["product_id"]}'
                }), 400

        # 创建订单

        order = Order(
            user_id=data['user_id'],
            total_amount=0
        )
        db.session.add(order)
        db.session.flush()

        # 创建订单项

        total_amount = 0
        for item in data['items']:
            order_item = OrderItem(
                order_id=order.id,
                product_id=item['product_id'],
                quantity=item['quantity'],
                price=item['price']
            )
            db.session.add(order_item)
            total_amount += item['price'] * item['quantity']

        order.total_amount = total_amount
        db.session.commit()

        # 扣减库存

        for item in data['items']:
            response = requests.get(
                f"http://product-service:5001/products/{item['product_id']}"
            )
            product = response.json()
            new_stock = product['stock'] - item['quantity']

            requests.put(
                f"http://product-service:5001/products/{item['product_id']}/stock",
                json={'stock': new_stock}
            )

        return jsonify({'order_id': order.id}), 201

    @app.route('/orders/<int:order_id>', methods=['GET'])
    def get_order(order_id):
        """获取订单详情"""
        order = Order.query.get_or_404(order_id)

        return jsonify({
            'id': order.id,
            'user_id': order.user_id,
            'total_amount': order.total_amount,
            'status': order.status,
            'items': [
                {
                    'product_id': item.product_id,
                    'quantity': item.quantity,
                    'price': item.price
                }
                for item in order.items
            ]
        })

    ## API Gateway 实现

```python

    # api_gateway/app.py
```python
    from flask import Flask, request, jsonify
    import requests
    import jwt
    from functools import wraps

    app = Flask(__name__)

    SERVICES = {
        'user': 'http://user-service:5000',
        'product': 'http://product-service:5001',
        'order': 'http://order-service:5002'
    }

    def authenticate(f):
    """认证装饰器"""
    @wraps(f)
python
        def decorated_function(*args, **kwargs):
            token = request.headers.get('Authorization')

            if not token:
                return jsonify({'error': 'Missing token'}), 401

            try:
                payload = jwt.decode(token.split()[1], 'your-secret-key', algorithms=['HS256'])
                request.user_id = payload['user_id']
            except jwt.InvalidTokenError:
                return jsonify({'error': 'Invalid token'}), 401

            return f(*args, **kwargs)

        return decorated_function
bash
    @app.route('/<service>/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
python
    def proxy(service, path):
    """代理请求到对应的服务"""
bash
        if service not in SERVICES:
            return jsonify({'error': 'Service not found'}), 404

        service_url = SERVICES[service]
        url = f"{service_url}/{path}"

        headers = {key: value for key, value in request.headers if key != 'Host'}

        response = requests.request(
            method=request.method,
            url=url,
            headers=headers,
            json=request.get_json(),
            params=request.args
        )

        return jsonify(response.json()), response.status_code

    @app.route('/health')
    def health():
    """健康检查"""
bash
        return jsonify({'status': 'healthy'})
```> >

### 2. 事件驱动架构

````python
`python

# 示例:设计事件驱动架构

用户请求:
"设计一个基于事件驱动的订单处理系统"

Claude Code 生成的架构设计:

## 系统架构

```> ┌──────────────┐

│   订单服务    │
│  (Order      │
│   Service)   │
└──────┬───────┘
│ 发布事件
▼
┌─────────────────────────────────────────────────────────────┐
│                    Message Broker                           │
│                    (Kafka/RabbitMQ)                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │ 订单创建  │  │ 订单支付  │  │ 订单发货  │  │ 订单完成  │  │
│  │ 事件     │  │ 事件     │  │ 事件     │  │ 事件     │  │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘  │
└─────────────────────────────────────────────────────────────┘
│              │              │              │
▼              ▼              ▼              ▼
┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐
│ 库存服务  │   │ 支付服务  │   │ 物流服务  │   │ 通知服务  │
│ Inventory│   │ Payment  │   │ Shipping │   │ Notify   │
│ Service  │   │ Service  │   │ Service  │   │ Service  │
└──────────┘   └──────────┘   └──────────┘   └──────────┘

## 事件定义

# events/order_events.py
python
    from dataclasses import dataclass
    from datetime import datetime
    from typing import Dict, Any
    import json
    @dataclass
    class OrderEvent:
"""订单事件基类"""
event_type: str
order_id: int
timestamp: datetime
data: Dict[str, Any]
python
    def to_dict(self) -> dict:
"""转换为字典"""
bash
    return {
    'event_type': self.event_type,
    'order_id': self.order_id,
    'timestamp': self.timestamp.isoformat(),
    'data': self.data
    }
    def to_json(self) -> str:
    """转换为 JSON"""
    return json.dumps(self.to_dict())
    @dataclass
    class OrderCreatedEvent(OrderEvent):
"""订单创建事件"""
python
    def __init__(self, order_id: int, user_id: int, total_amount: float, items: list):
    super().__init__(
    event_type='order.created',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'user_id': user_id,
    'total_amount': total_amount,
    'items': items
    }
    )
    @dataclass
    class OrderPaidEvent(OrderEvent):
"""订单支付事件"""
python
    def __init__(self, order_id: int, payment_method: str, transaction_id: str):
    super().__init__(
    event_type='order.paid',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'payment_method': payment_method,
    'transaction_id': transaction_id
    }
    )
    @dataclass
    class OrderShippedEvent(OrderEvent):
"""订单发货事件"""
python
    def __init__(self, order_id: int, tracking_number: str, shipping_address: dict):
    super().__init__(
    event_type='order.shipped',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'tracking_number': tracking_number,
    'shipping_address': shipping_address
    }
    )
    @dataclass
    class OrderCompletedEvent(OrderEvent):
"""订单完成事件"""
python
    def __init__(self, order_id: int, delivery_time: datetime):
    super().__init__(
    event_type='order.completed',
    order_id=order_id,
    timestamp=datetime.utcnow(),
    data={
    'delivery_time': delivery_time.isoformat()
    }
    )
## 事件发布者
python

    # services/order_service/event_publisher.py
    import pika
    import json
    from events.order_events import OrderEvent

    class EventPublisher:
        """事件发布者"""

        def __init__(self, rabbitmq_url: str = 'amqp://localhost'):
            self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
            self.channel = self.connection.channel()

            # 声明交换机
            self.channel.exchange_declare(
                exchange='order_events',
                exchange_type='topic'
            )

        def publish(self, event: OrderEvent, routing_key: str):
            """发布事件"""
            self.channel.basic_publish(
                exchange='order_events',
                routing_key=routing_key,
                body=event.to_json(),
                properties=pika.BasicProperties(
                    delivery_mode=2,  # 持久化
                )
            )

        def close(self):
            """关闭连接"""
            self.connection.close()

    ## 事件消费者

    # services/inventory_service/event_consumer.py
    import pika
    import json
    import requests
    class InventoryEventConsumer:
    """库存事件消费者"""
    def __init__(self, rabbitmq_url: str = 'amqp://localhost'):
    self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
    self.channel = self.connection.channel()
    # 声明交换机和队列
    self.channel.exchange_declare(
    exchange='order_events',
    exchange_type='topic'
    )
    self.channel.queue_declare(queue='inventory_queue', durable=True)
    self.channel.queue_bind(
    exchange='order_events',
    queue='inventory_queue',
    routing_key='order.created'
    )
    def handle_order_created(self, ch, method, properties, body):
    """处理订单创建事件"""
    event = json.loads(body)
    print(f"Received order.created event: {event}")
    # 扣减库存
    for item in event['data']['items']:
    response = requests.get(
    f"http://product-service:5001/products/{item['product_id']}"
    )
    product = response.json()
    new_stock = product['stock'] - item['quantity']
    requests.put(
    f"http://product-service:5001/products/{item['product_id']}/stock",
    json={'stock': new_stock}
    )
    # 确认消息
    ch.basic_ack(delivery_tag=method.delivery_tag)
    def start_consuming(self):
    """开始消费消息"""
    self.channel.basic_qos(prefetch_count=1)
    self.channel.basic_consume(
    queue='inventory_queue',
    on_message_callback=self.handle_order_created
    )
    print('Inventory service started consuming messages...')
    self.channel.start_consuming()
    def close(self):
    """关闭连接"""
    self.connection.close()

    ## 事件驱动订单服务

```python

    # services/order_service/app.py
```python
    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from datetime import datetime
    from event_publisher import EventPublisher
    from events.order_events import OrderCreatedEvent

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://localhost/orders_db'

    db = SQLAlchemy(app)
    event_publisher = EventPublisher()

    class Order(db.Model):
    """订单模型"""
    __tablename__ = 'orders'

    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, nullable=False)
    total_amount = db.Column(db.Float, nullable=False)
    status = db.Column(db.String(20), default='pending')
    created_at = db.Column(db.DateTime, server_default=db.func.now())
    updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())
python
    class OrderItem(db.Model):
    """订单项模型"""
    __tablename__ = 'order_items'

    id = db.Column(db.Integer, primary_key=True)
    order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
    product_id = db.Column(db.Integer, nullable=False)
    quantity = db.Column(db.Integer, nullable=False)
    price = db.Column(db.Float, nullable=False)

    order = db.relationship('Order', backref='items')

@app.route('/orders', methods=['POST'])
python
    def create_order():
    """创建订单"""
    data = request.get_json()

    # 创建订单
    order = Order(
        user_id=data['user_id'],
        total_amount=0
    )
    db.session.add(order)
    db.session.flush()

    # 创建订单项
    total_amount = 0
    items = []
bash
        for item in data['items']:
            order_item = OrderItem(
                order_id=order.id,
                product_id=item['product_id'],
                quantity=item['quantity'],
                price=item['price']
            )
            db.session.add(order_item)
            total_amount += item['price'] * item['quantity']
            items.append({
                'product_id': item['product_id'],
                'quantity': item['quantity'],
                'price': item['price']
            })

        order.total_amount = total_amount
        db.session.commit()
    # 发布订单创建事件
    event = OrderCreatedEvent(
        order_id=order.id,
        user_id=data['user_id'],
        total_amount=total_amount,
        items=items
    )
bash
        event_publisher.publish(event, 'order.created')

        return jsonify({'order_id': order.id}), 201

    if __name__ == '__main__':
        app.run(host='0.0.0.0', port=5002)

基于 MIT 许可发布