Skip to content

25.2 代码生成与补全

代码生成与补全概述

Claude Code 提供了强大的代码生成与补全功能,可以根据自然语言描述、代码片段、API 规范等生成高质量的代码,并智能补全未完成的代码。

代码生成

1. 从自然语言生成代码

示例:从自然语言生成代码

用户请求: "创建一个用户认证系统,支持注册、登录、密码重置功能"

Claude Code 生成的代码:

python
    python

    ````python

```python
```python
    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
    from flask_mail import Mail, Message
    from datetime import datetime, timedelta
    import bcrypt
    import secrets

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///auth.db'
    app.config['JWT_SECRET_KEY'] = 'your-secret-key'
    app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24)
    app.config['MAIL_SERVER'] = 'smtp.gmail.com'
    app.config['MAIL_PORT'] = 587
    app.config['MAIL_USE_TLS'] = True
    app.config['MAIL_USERNAME'] = 'your-email@gmail.com'
    app.config['MAIL_PASSWORD'] = 'your-password'

    db = SQLAlchemy(app)
    jwt = JWTManager(app)
    mail = Mail(app)

    class User(db.Model):
"""用户模型"""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

reset_token = db.Column(db.String(100), unique=True)
reset_token_expires = db.Column(db.DateTime)
python
    class PasswordReset(db.Model):
"""密码重置记录"""
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
token = db.Column(db.String(100), unique=True, nullable=False)
expires_at = db.Column(db.DateTime, nullable=False)
used = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)

@app.route('/register', methods=['POST'])
python
    def register():
"""用户注册"""
data = request.get_json()

 # 验证输入
bash
    if not data.get('username') or not data.get('email') or not data.get('password'):
    return jsonify({'error': 'Missing required fields'}), 400
 # 检查用户名是否已存在
bash
    if User.query.filter_by(username=data['username']).first():
    return jsonify({'error': 'Username already exists'}), 400
 # 检查邮箱是否已存在
bash
    if User.query.filter_by(email=data['email']).first():
    return jsonify({'error': 'Email already exists'}), 400
 # 验证密码强度
bash
    if len(data['password']) < 8:
    return jsonify({'error': 'Password must be at least 8 characters'}), 400
 # 创建用户
user = User(
username=data['username'],
email=data['email'],
password_hash=bcrypt.hashpw(
data['password'].encode('utf-8'),
bash
    bcrypt.gensalt()
    ).decode('utf-8')
    )

    db.session.add(user)
    db.session.commit()
 # 生成访问令牌
access_token = create_access_token(identity=user.id)
bash
    return jsonify({
    'message': 'User registered successfully',
    'user': {
    'id': user.id,
    'username': user.username,
    'email': user.email
    },
    'access_token': access_token
    }), 201

    @app.route('/login', methods=['POST'])
    def login():
"""用户登录"""
data = request.get_json()

 # 验证输入
bash
    if not data.get('username') or not data.get('password'):
    return jsonify({'error': 'Missing username or password'}), 400
 # 查找用户
user = User.query.filter_by(username=data['username']).first()
bash
    if not user or not bcrypt.checkpw(
    data['password'].encode('utf-8'),
    user.password_hash.encode('utf-8')
    ):
    return jsonify({'error': 'Invalid username or password'}), 401

    if not user.is_active:
    return jsonify({'error': 'Account is deactivated'}), 403
 # 生成访问令牌
access_token = create_access_token(identity=user.id)
bash
    return jsonify({
    'message': 'Login successful',
    'user': {
    'id': user.id,
    'username': user.username,
    'email': user.email
    },
    'access_token': access_token
    })

    @app.route('/me', methods=['GET'])
    @jwt_required()
    def get_current_user():
"""获取当前用户信息"""
user_id = get_jwt_identity()
user = User.query.get(user_id)
bash
    if not user:
    return jsonify({'error': 'User not found'}), 404

    return jsonify({
    'id': user.id,
    'username': user.username,
    'email': user.email,
    'is_active': user.is_active,
    'created_at': user.created_at.isoformat()
    })

    @app.route('/request-password-reset', methods=['POST'])
    def request_password_reset():
"""请求密码重置"""
data = request.get_json()
bash
    if not data.get('email'):
    return jsonify({'error': 'Email is required'}), 400

    user = User.query.filter_by(email=data['email']).first()

    if not user:
 # 为了安全,不透露邮箱是否存在
bash
    return jsonify({
    'message': 'If the email exists, a reset link has been sent'
    }), 200
 # 生成重置令牌
reset_token = secrets.token_urlsafe(32)
reset_token_expires = datetime.utcnow() + timedelta(hours=1)

 # 保存重置令牌
password_reset = PasswordReset(
user_id=user.id,
token=reset_token,
expires_at=reset_token_expires
)

db.session.add(password_reset)
db.session.commit()

 # 发送重置邮件
reset_link = f"http://localhost:3000/reset-password?token=`{reset_token}`"

msg = Message(
'Password Reset Request',
recipients=[user.email]
)
msg.body = f'''
Hello {user.username},

You have requested to reset your password. Click the link below to reset your password:

`{reset_link}`

This link will expire in 1 hour.

If you did not request this password reset, please ignore this email.
'''
bash
    mail.send(msg)

    return jsonify({
    'message': 'If the email exists, a reset link has been sent'
    }), 200

    @app.route('/reset-password', methods=['POST'])
    def reset_password():
"""重置密码"""
data = request.get_json()
bash
    if not data.get('token') or not data.get('password'):
    return jsonify({'error': 'Token and password are required'}), 400
 # 查找重置记录
password_reset = PasswordReset.query.filter_by(
token=data['token'],
used=False
).first()
bash
    if not password_reset:
    return jsonify({'error': 'Invalid or expired reset token'}), 400

    if password_reset.expires_at < datetime.utcnow():
    return jsonify({'error': 'Reset token has expired'}), 400
 # 验证密码强度
bash
    if len(data['password']) < 8:
    return jsonify({'error': 'Password must be at least 8 characters'}), 400
 # 更新密码
user = User.query.get(password_reset.user_id)
user.password_hash = bcrypt.hashpw(
data['password'].encode('utf-8'),
bash
    bcrypt.gensalt()
    ).decode('utf-8')
 # 标记重置令牌为已使用
password_reset.used = True

db.session.commit()
bash
    return jsonify({'message': 'Password reset successful'}), 200

    @app.route('/change-password', methods=['POST'])
    @jwt_required()
    def change_password():
"""修改密码"""
data = request.get_json()
user_id = get_jwt_identity()
bash
    if not data.get('current_password') or not data.get('new_password'):
    return jsonify({'error': 'Current password and new password are required'}), 400

    user = User.query.get(user_id)
 # 验证当前密码
bash
    if not bcrypt.checkpw(
    data['current_password'].encode('utf-8'),
    user.password_hash.encode('utf-8')
    ):
    return jsonify({'error': 'Current password is incorrect'}), 400
 # 验证新密码强度
bash
    if len(data['new_password']) < 8:
    return jsonify({'error': 'New password must be at least 8 characters'}), 400
 # 更新密码
user.password_hash = bcrypt.hashpw(
data['new_password'].encode('utf-8'),
bash
    bcrypt.gensalt()
    ).decode('utf-8')

    db.session.commit()

    return jsonify({'message': 'Password changed successfully'}), 200

    if __name__ == '__main__':
    with app.app_context():
    db.create_all()
    app.run(debug=True)
```> **生成的代码包含**:

> - 用户注册功能
> - 用户登录功能
> - JWT 认证
> - 密码重置请求
> - 密码重置
> - 修改密码
> - 邮件发送
> - 数据验证
> - 错误处理

### 2. 从 API 规范生成代码

# 示例:从 OpenAPI 规范生成代码
用户请求:
"根据这个 OpenAPI 规范生成 API 代码"
OpenAPI 规范:

````yaml

`yaml

openapi: 3.0.0
info:
title: Product API
version: 1.0.0

paths:
/products:
get:
summary: Get all products
parameters:

 - name: page

in: query
schema:
type: integer

 - name: per_page

in: query
schema:
type: integer
responses:
'200':
description: Success
content:
application/json:
schema:
type: object
properties:
products:
type: array
items:
$ref: '#/components/schemas/Product'
total:
type: integer
page:
type: integer

post:
summary: Create a product
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ProductCreate'
responses:
'201':
description: Created
content:
application/json:
schema:
$ref: '#/components/schemas/Product'

/products/{product_id}:
get:
summary: Get a product
parameters:

 - name: product_id

in: path
required: true
schema:
type: integer
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/Product'

put:
summary: Update a product
parameters:

 - name: product_id

in: path
required: true
schema:
type: integer
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/ProductUpdate'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/Product'

delete:
summary: Delete a product
parameters:

 - name: product_id

in: path
required: true
schema:
type: integer
responses:
'204':
description: No content

components:
schemas:
Product:
type: object
properties:
id:
type: integer
name:
type: string
description:
type: string
price:
type: number
stock:
type: integer
category:
type: string
created_at:
type: string
format: date-time
updated_at:
type: string
format: date-time

ProductCreate:
type: object
required:

 - name
 - price
 - stock
 - category

properties:
name:
type: string
description:
type: string
price:
type: number
stock:
type: integer
category:
type: string

ProductUpdate:
type: object
properties:
name:
type: string
description:
type: string
price:
type: number
stock:
type: integer
category:
type: string

```> Claude Code 生成的代码:
python

    from flask import Flask, request, jsonify
    from flask_sqlalchemy import SQLAlchemy
    from datetime import datetime
    from typing import Optional

    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///products.db'

    db = SQLAlchemy(app)

    class Product(db.Model):
        """商品模型"""
        id = db.Column(db.Integer, primary_key=True)
        name = db.Column(db.String(200), nullable=False)
        description = db.Column(db.Text)
        price = db.Column(db.Float, nullable=False)
        stock = db.Column(db.Integer, nullable=False)
        category = db.Column(db.String(100), nullable=False)
        created_at = db.Column(db.DateTime, default=datetime.utcnow)
        updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

        def to_dict(self):
            """转换为字典"""
            return {
                'id': self.id,
                'name': self.name,
                'description': self.description,
                'price': self.price,
                'stock': self.stock,
                'category': self.category,
                'created_at': self.created_at.isoformat(),
                'updated_at': self.updated_at.isoformat()
            }

    @app.route('/products', methods=['GET'])
    def get_products():
        """获取所有商品"""
        page = request.args.get('page', 1, type=int)
        per_page = request.args.get('per_page', 20, type=int)

        pagination = Product.query.paginate(page=page, per_page=per_page)

        return jsonify({
            'products': [product.to_dict() for product in pagination.items],
            'total': pagination.total,
            'page': page
        })

    @app.route('/products', methods=['POST'])
    def create_product():
        """创建商品"""
        data = request.get_json()

        # 验证必填字段

        required_fields = ['name', 'price', 'stock', 'category']
        for field in required_fields:
            if field not in data:
                return jsonify({'error': f'Missing required field: {field}'}), 400

        # 验证数据类型

        if not isinstance(data['price'], (int, float)) or data['price'] < 0:
            return jsonify({'error': 'Price must be a positive number'}), 400

        if not isinstance(data['stock'], int) or data['stock'] < 0:
            return jsonify({'error': 'Stock must be a non-negative integer'}), 400

        # 创建商品

        product = Product(
            name=data['name'],
            description=data.get('description'),
            price=data['price'],
            stock=data['stock'],
            category=data['category']
        )

        db.session.add(product)
        db.session.commit()

        return jsonify(product.to_dict()), 201

    @app.route('/products/<int:product_id>', methods=['GET'])
    def get_product(product_id):
        """获取商品详情"""
        product = Product.query.get_or_404(product_id)
        return jsonify(product.to_dict())

    @app.route('/products/<int:product_id>', methods=['PUT'])
    def update_product(product_id):
        """更新商品"""
        product = Product.query.get_or_404(product_id)
        data = request.get_json()

        # 更新字段

        if 'name' in data:
            product.name = data['name']

        if 'description' in data:
            product.description = data['description']

        if 'price' in data:
            if not isinstance(data['price'], (int, float)) or data['price'] < 0:
                return jsonify({'error': 'Price must be a positive number'}), 400
            product.price = data['price']

        if 'stock' in data:
            if not isinstance(data['stock'], int) or data['stock'] < 0:
                return jsonify({'error': 'Stock must be a non-negative integer'}), 400
            product.stock = data['stock']

        if 'category' in data:
            product.category = data['category']

        product.updated_at = datetime.utcnow()
        db.session.commit()

        return jsonify(product.to_dict())

    @app.route('/products/<int:product_id>', methods=['DELETE'])
    def delete_product(product_id):
        """删除商品"""
        product = Product.query.get_or_404(product_id)
        db.session.delete(product)
        db.session.commit()

        return '', 204

    if __name__ == '__main__':
        with app.app_context():
            db.create_all()
        app.run(debug=True)

    > **生成的代码包含**:
    > - 完整的 CRUD 操作
    > - 数据验证
    > - 分页功能
    > - 错误处理
    > - 符合 OpenAPI 规范

    ## 代码补全

    ### 1. 智能方法补全

```python

    # 示例:智能方法补全

    用户输入:

    ```python

    class OrderService:
        def __init__(self, db):
            self.db = db

        def create_order(self, user_id, items):
            """创建订单"""
            pass

    Claude Code 自动补全:

    ````python

    ````python

    class OrderService:
    def __init__(self, db):
    self.db = db

    def create_order(self, user_id, items):
    """
    创建订单

    参数:
    user_id: 用户 ID
    items: 商品列表 [{'product_id': 1, 'quantity': 2}, ...]

    返回:
    订单对象
    """
     # 验证用户
    user = self.db.session.query(User).filter_by(id=user_id).first()
    if not user:
    raise UserNotFoundError(f"User with id {user_id} not found")

     # 验证商品
    product_ids = [item['product_id'] for item in items]
    products = self.db.session.query(Product).filter(
    Product.id.in_(product_ids)
    ).all()

    product_map = {product.id: product for product in products}

    for item in items:
    if item['product_id'] not in product_map:
    raise ProductNotFoundError(
    f"Product with id {item['product_id']} not found"
    )

    product = product_map[item['product_id']]
    if product.stock < item['quantity']:
    raise InsufficientStockError(
    f"Insufficient stock for product {product.name}"
    )

     # 计算总金额
    total_amount = sum(
    product_map[item['product_id']].price * item['quantity']
    for item in items
    )

     # 创建订单
    order = Order(
    user_id=user_id,
    total_amount=total_amount,
    status='pending'
    )

    self.db.session.add(order)
    self.db.session.flush()

     # 创建订单项
    for item in items:
    product = product_map[item['product_id']]
    order_item = OrderItem(
    order_id=order.id,
    product_id=item['product_id'],
    quantity=item['quantity'],
    price=product.price
    )
    self.db.session.add(order_item)

     # 扣减库存
    product.stock -= item['quantity']

    self.db.session.commit()

    return order

    def get_order(self, order_id):
    """获取订单详情"""
    order = self.db.session.query(Order).filter_by(id=order_id).first()
    if not order:
    raise OrderNotFoundError(f"Order with id {order_id} not found")
    return order

    def update_order_status(self, order_id, status):
    """更新订单状态"""
    order = self.get_order(order_id)
    order.status = status
    order.updated_at = datetime.utcnow()
    self.db.session.commit()
    return order

    def cancel_order(self, order_id):
    """取消订单"""
    order = self.get_order(order_id)

    if order.status in ['paid', 'shipped', 'completed']:
    raise InvalidOrderStatusError(
    f"Cannot cancel order with status {order.status}"
    )

     # 恢复库存
    for item in order.items:
    product = self.db.session.query(Product).filter_by(
    id=item.product_id
    ).first()
    if product:
    product.stock += item.quantity

    order.status = 'cancelled'
    order.updated_at = datetime.utcnow()
    self.db.session.commit()

    return order

    def get_user_orders(self, user_id, page=1, per_page=20):
    """获取用户订单列表"""
    pagination = self.db.session.query(Order).filter_by(
    user_id=user_id
    ).order_by(
    Order.created_at.desc()
    ).paginate(page=page, per_page=per_page)

    return pagination

    ```> **补全的内容**:

    > - 完整的方法实现
    > - 数据验证
    > - 错误处理
    > - 库存管理
    > - 相关方法补全

    ### 2. 智能类补全

    # 示例:智能类补全
    用户输入:

    ````python

    `python

```python
    class UserRepository:
    def __init__(self, db):
    self.db = db
```> Claude Code 自动补全:
python

```python
    from typing import List, Optional
    from datetime import datetime

    class UserRepository:
    """用户仓储类"""
python
        def __init__(self, db):
            self.db = db

        def find_by_id(self, user_id: int) -> Optional[User]:
        """根据 ID 查找用户"""
python
            return self.db.session.query(User).filter_by(id=user_id).first()

        def find_by_username(self, username: str) -> Optional[User]:
        """根据用户名查找用户"""
python
            return self.db.session.query(User).filter_by(username=username).first()

        def find_by_email(self, email: str) -> Optional[User]:
        """根据邮箱查找用户"""
python
            return self.db.session.query(User).filter_by(email=email).first()

        def find_all(self, page: int = 1, per_page: int = 20):
        """查找所有用户"""
python
            return self.db.session.query(User).paginate(
                page=page,
                per_page=per_page
            )

        def create(self, user: User) -> User:
        """创建用户"""
python
            self.db.session.add(user)
            self.db.session.commit()
            return user

        def update(self, user: User) -> User:
        """更新用户"""
        user.updated_at = datetime.utcnow()
python
            self.db.session.commit()
            return user

        def delete(self, user_id: int) -> bool:
        """删除用户"""
        user = self.find_by_id(user_id)
bash
            if not user:
                return False

            self.db.session.delete(user)
            self.db.session.commit()
            return True

        def exists_by_username(self, username: str) -> bool:
        """检查用户名是否存在"""
python
            return self.db.session.query(User).filter_by(
                username=username
            ).first() is not None

        def exists_by_email(self, email: str) -> bool:
        """检查邮箱是否存在"""
python
            return self.db.session.query(User).filter_by(
                email=email
            ).first() is not None

        def count(self) -> int:
        """统计用户数量"""
python
            return self.db.session.query(User).count()

        def search(self, keyword: str, page: int = 1, per_page: int = 20):
        """搜索用户"""
python
            return self.db.session.query(User).filter(
                User.username.ilike(f'%{keyword}%') |
                User.email.ilike(f'%{keyword}%')
            ).paginate(page=page, per_page=per_page)
> **补全的内容**:
> - 完整的 CRUD 方法
> - 查询方法
> - 存在性检查
> - 搜索功能
> - 分页支持

基于 MIT 许可发布