FastAPI + SQLAlchemy 数据库迁移最佳实践指南

📋 目录

概述

什么是数据库迁移?

数据库迁移是管理数据库架构变更的版本控制系统。它允许您:

  • 跟踪数据库结构的所有变更
  • 在团队间同步数据库架构
  • 安全地升级和回滚数据库版本
  • 自动化部署过程

为什么选择Alembic?

Alembic 是SQLAlchemy的官方迁移工具,相当于Django的migrations系统:
版本控制:每个迁移都有唯一版本号  
自动生成:自动检测模型变更  
双向迁移:支持升级和降级  
异步支持:完全支持async/await  
多数据库:支持MySQL、PostgreSQL、SQLite等

环境准备

1. 安装依赖

1
2
3
4
5
6
7
# 核心依赖
pip install fastapi sqlalchemy alembic

# 数据库驱动(根据需要选择)
pip install aiomysql PyMySQL          # MySQL
pip install asyncpg psycopg2-binary   # PostgreSQL  
pip install aiosqlite                 # SQLite

2. 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
your-project/
├── alembic.ini                 # Alembic配置文件
├── migrations/                 # 迁移目录
│   ├── env.py                 # 环境配置
│   ├── script.py.mako         # 迁移模板
│   └── versions/              # 迁移版本文件
├── core/
│   └── models/                # SQLAlchemy模型
│       ├── __init__.py
│       ├── base.py           # 基础模型
│       └── user.py           # 具体模型
├── config/
│   └── database.py           # 数据库配置
└── main.py                   # FastAPI应用

Alembic配置

1. 初始化Alembic

1
2
# 在项目根目录执行
alembic init migrations

2. 配置数据库连接

config/database.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from pydantic_settings import BaseSettings

class DatabaseSettings(BaseSettings):
    """数据库配置类"""
    # 数据库类型: 'mysql', 'sqlite', 'postgresql'
    DB_TYPE: str = "mysql"

    # MySQL 配置
    MYSQL_USER: str = "root"
    MYSQL_PASSWORD: str = "password"
    MYSQL_HOST: str = "localhost"
    MYSQL_PORT: int = 3306
    MYSQL_DB: str = "your_database"

    # 连接池配置
    DB_POOL_SIZE: int = 5
    DB_MAX_OVERFLOW: int = 10
    DB_ECHO: bool = False

    @property
    def sync_database_url(self) -> str:
        """同步数据库 URL"""
        if self.DB_TYPE.lower() == "mysql":
            return (
                f"mysql+pymysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@"
                f"{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DB}"
            )
        elif self.DB_TYPE.lower() == "sqlite":
            return f"sqlite:///database.db"
        else:
            raise ValueError(f"不支持的数据库类型: {self.DB_TYPE}")

    @property
    def async_database_url(self) -> str:
        """异步数据库 URL"""
        if self.DB_TYPE.lower() == "mysql":
            return (
                f"mysql+aiomysql://{self.MYSQL_USER}:{self.MYSQL_PASSWORD}@"
                f"{self.MYSQL_HOST}:{self.MYSQL_PORT}/{self.MYSQL_DB}"
            )
        elif self.DB_TYPE.lower() == "sqlite":
            return f"sqlite+aiosqlite:///database.db"
        else:
            raise ValueError(f"不支持的数据库类型: {self.DB_TYPE}")

    class Config:
        env_file = ".env"
        case_sensitive = True

settings = DatabaseSettings()

3. 配置模型基类

core/models/base.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from datetime import datetime

from typing import Any

from sqlalchemy import MetaData
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

# 命名约定
convention = {
    "ix": "ix_%(column_0_label)s",
    "uq": "uq_%(table_name)s_%(column_0_name)s",
    "ck": "ck_%(table_name)s_%(constraint_name)s",
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk": "pk_%(table_name)s",
}

metadata = MetaData(naming_convention=convention)

class Base(DeclarativeBase):
    """声明性基类"""
    metadata = metadata

    @declared_attr.directive
    def __tablename__(cls) -> str:
        """自动生成表名"""
        return cls.__name__.lower()

    # 共同字段
    id: Mapped[int] = mapped_column(primary_key=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow, onupdate=datetime.utcnow
    )

4. 配置Alembic环境

migrations/env.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""Alembic 环境配置"""
import asyncio
import sys
from logging.config import fileConfig
from pathlib import Path

from alembic import context
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config

# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))



# 导入项目模块
from config.database import settings
from core.models import metadata



# Alembic配置对象
config = context.config

# 配置日志
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# 设置目标元数据
target_metadata = metadata

def run_migrations_offline() -> None:
    """离线模式运行迁移"""
    url = settings.sync_database_url
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
    """执行迁移"""
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations() -> None:
    """异步运行迁移"""
    configuration = config.get_section(config.config_ini_section)
    configuration["sqlalchemy.url"] = settings.async_database_url
    connectable = async_engine_from_config(
        configuration,
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def run_migrations_online() -> None:

    """在线模式运行迁移"""

    asyncio.run(run_async_migrations())



# 根据模式选择执行方式

if context.is_offline_mode():

    run_migrations_offline()

else:

    run_migrations_online()

迁移工作流程

1. 创建模型

core/models/user.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

from typing import Optional

from sqlalchemy import String

from sqlalchemy.orm import Mapped, mapped_column

from .base import Base



class User(Base):

    """用户模型"""

    username: Mapped[str] = mapped_column(String(50), unique=True)

    email: Mapped[Optional[str]] = mapped_column(String(120), unique=True)

    password_hash: Mapped[str] = mapped_column(String(255))

    is_active: Mapped[bool] = mapped_column(default=True)

⚠️ 重要:MySQL要求所有VARCHAR字段必须指定长度!

2. 生成迁移文件

1
2
3
4
5

# 自动检测模型变更并生成迁移

alembic revision --autogenerate -m "Add user table"

3. 审查迁移文件

生成的迁移文件位于 migrations/versions/,务必检查:

  • SQL语句是否正确

  • 是否遗漏了字段或索引

  • 数据迁移逻辑是否安全

4. 执行迁移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 升级到最新版本

alembic upgrade head



# 升级到特定版本

alembic upgrade <revision_id>



# 降级到前一个版本

alembic downgrade -1



# 降级到特定版本

alembic downgrade <revision_id>

5. 查看迁移状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 查看当前版本

alembic current



# 查看迁移历史

alembic history



# 查看详细历史

alembic history --verbose

最佳实践

1. 模型设计原则

✅ 正确做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

# 明确指定字符串长度

username: Mapped[str] = mapped_column(String(50), unique=True)



# 使用类型注解

created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)



# 明确可选字段

description: Mapped[Optional[str]] = mapped_column(String(1000))



# 使用枚举类型

from enum import Enum



class UserStatus(str, Enum):

    ACTIVE = "active"

    INACTIVE = "inactive"



status: Mapped[UserStatus] = mapped_column(default=UserStatus.ACTIVE)

❌ 避免的做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 不指定字符串长度(MySQL会报错)

username: Mapped[str]



# 不使用类型注解

username = Column(String(50))



# 使用魔术数字

username: Mapped[str] = mapped_column(String(999))

2. 迁移文件命名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 好的命名

alembic revision --autogenerate -m "Add user authentication tables"

alembic revision --autogenerate -m "Add index on user.email"

alembic revision --autogenerate -m "Update user.password_hash length"



# 避免的命名

alembic revision --autogenerate -m "update"

alembic revision --autogenerate -m "fix"

alembic revision --autogenerate -m "changes"

3. 数据迁移

对于复杂的数据变更,手动编写迁移文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

"""

增加用户全名字段并迁移现有数据

Revision ID: abc123def456

Create Date: 2024-01-01 12:00:00.000000

"""

from alembic import op

import sqlalchemy as sa



def upgrade() -> None:

    # 添加新字段

    op.add_column('user', sa.Column('full_name', sa.String(100), nullable=True))

    # 数据迁移

    connection = op.get_bind()

    connection.execute(

        "UPDATE user SET full_name = CONCAT(first_name, ' ', last_name) "

        "WHERE first_name IS NOT NULL AND last_name IS NOT NULL"

    )



def downgrade() -> None:

    op.drop_column('user', 'full_name')

4. 索引管理

1
2
3
4
5
6
7
8
9
10
11
12
13

# 在模型中定义索引

class User(Base):

    email: Mapped[str] = mapped_column(String(120), unique=True, index=True)

    __table_args__ = (

        Index('ix_user_email_active', 'email', 'is_active'),

    )

5. 外键约束

1
2
3
4
5
6
7
8
9
10
11

class Post(Base):

    title: Mapped[str] = mapped_column(String(200))

    author_id: Mapped[int] = mapped_column(ForeignKey("user.id"))

    # 关系定义

    author = relationship("User", back_populates="posts")

数据库兼容性

MySQL 特殊考虑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 1. 所有VARCHAR必须指定长度

content: Mapped[str] = mapped_column(String(5000))  # 而不是 Text()



# 2. 使用正确的字符集

# 在连接URL中指定

"mysql+aiomysql://user:pass@host/db?charset=utf8mb4"



# 3. 布尔字段

is_active: Mapped[bool] = mapped_column(default=True)  # 会变成TINYINT(1)

PostgreSQL 特殊考虑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 1. 可以使用Text类型

content: Mapped[str] = mapped_column(Text())



# 2. 数组字段

from sqlalchemy.dialects.postgresql import ARRAY

tags: Mapped[list] = mapped_column(ARRAY(String))



# 3. JSON字段

metadata: Mapped[dict] = mapped_column(JSON)

SQLite 特殊考虑

1
2
3
4
5
6
7

# 1. 没有严格的类型检查

# 2. 不支持某些ALTER TABLE操作

# 3. 适合开发和测试环境

生产环境部署

1. 部署前检查清单

  • [ ] 备份生产数据库

  • [ ] 在测试环境验证迁移

  • [ ] 检查迁移文件的SQL语句

  • [ ] 评估迁移执行时间

  • [ ] 准备回滚计划

2. 自动化部署脚本

scripts/deploy_migrations.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

"""

生产环境迁移部署脚本

"""

import asyncio

import subprocess

import sys

from datetime import datetime



async def backup_database():

    """备份数据库"""

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

    backup_file = f"backup_{timestamp}.sql"

    cmd = [

        "mysqldump",

        "-h", "localhost",

        "-u", "user",

        "-p",

        "database_name",

        ">", backup_file

    ]

    print(f"创建数据库备份: {backup_file}")

    # 执行备份命令

def run_migrations():

    """执行迁移"""

    try:

        print("开始执行数据库迁移...")

        result = subprocess.run(

            ["alembic", "upgrade", "head"],

            capture_output=True,

            text=True,

            check=True

        )

        print("✅ 迁移执行成功!")

        print(result.stdout)

        return True

    except subprocess.CalledProcessError as e:

        print("❌ 迁移执行失败!")

        print(e.stderr)

        return False



async def main():

    """主函数"""

    print("=== 生产环境数据库迁移 ===")

    # 1. 备份数据库

    await backup_database()

    # 2. 执行迁移

    if run_migrations():

        print("🎉 部署完成!")

    else:

        print("💥 部署失败,请检查错误信息")

        sys.exit(1)



if __name__ == "__main__":

    asyncio.run(main())

3. Docker环境部署

Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

FROM python:3.11



WORKDIR /app

COPY requirements.txt .

RUN pip install -r requirements.txt



COPY . .



# 运行迁移和启动应用

CMD ["sh", "-c", "alembic upgrade head && python main.py"]

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

version: '3.8'

services:

  app:

    build: .

    ports:

      - "8000:8000"

    environment:

      - DB_TYPE=mysql

      - MYSQL_HOST=db

      - MYSQL_DB=myapp

    depends_on:

      - db

  db:

    image: mysql:8.0

    environment:

      MYSQL_ROOT_PASSWORD: password

      MYSQL_DATABASE: myapp

    volumes:

      - mysql_data:/var/lib/mysql

volumes:

  mysql_data:

故障排除

常见问题及解决方案

1. “VARCHAR requires a length on dialect mysql”

问题:MySQL要求VARCHAR字段指定长度

解决

1
2
3
4
5
6
7
8
9
10
11

# 错误

description: Mapped[str]



# 正确

description: Mapped[str] = mapped_column(String(1000))

2. “No module named ‘core’”

问题:迁移环境找不到项目模块

解决:在 migrations/env.py 中添加路径:

1
2
3
4
5
6
7
8
9
10
11

import sys

from pathlib import Path



project_root = Path(__file__).parent.parent

sys.path.insert(0, str(project_root))

3. 迁移文件冲突

问题:多人开发时迁移版本冲突

解决

1
2
3
4
5

# 合并迁移分支

alembic merge -m "Merge migration branches" head1 head2

4. 回滚迁移

问题:需要撤销错误的迁移

解决

1
2
3
4
5
6
7
8
9
10
11

# 回滚到前一个版本

alembic downgrade -1



# 回滚到特定版本

alembic downgrade abc123def456

5. 手动修复迁移状态

问题:迁移状态与实际数据库不一致

解决

1
2
3
4
5
6
7
8
9
10
11

# 标记当前版本(不执行SQL)

alembic stamp head



# 标记特定版本

alembic stamp abc123def456

实际示例

完整的用户管理模型

core/models/user.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

from datetime import datetime

from enum import Enum

from typing import Optional



from sqlalchemy import String, Index

from sqlalchemy.orm import Mapped, mapped_column, relationship



from .base import Base



class UserRole(str, Enum):

    ADMIN = "admin"

    USER = "user"

    MODERATOR = "moderator"



class User(Base):

    """用户模型"""

    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)

    email: Mapped[str] = mapped_column(String(120), unique=True, index=True)

    password_hash: Mapped[str] = mapped_column(String(255))

    full_name: Mapped[Optional[str]] = mapped_column(String(100))

    role: Mapped[UserRole] = mapped_column(default=UserRole.USER)

    is_active: Mapped[bool] = mapped_column(default=True)

    last_login: Mapped[Optional[datetime]]

    # 复合索引

    __table_args__ = (

        Index('ix_user_email_active', 'email', 'is_active'),

        Index('ix_user_role_active', 'role', 'is_active'),

    )

    def __repr__(self) -> str:

        return f"<User(username='{self.username}', email='{self.email}')>"

完整的迁移工作流程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

# 1. 初始化项目

alembic init migrations



# 2. 创建初始迁移

alembic revision --autogenerate -m "Initial migration: add user table"



# 3. 执行迁移

alembic upgrade head



# 4. 添加新字段

# 修改模型后...

alembic revision --autogenerate -m "Add user.full_name field"



# 5. 执行新迁移

alembic upgrade head



# 6. 查看状态

alembic current

alembic history



# 7. 如需回滚

alembic downgrade -1

总结

FastAPI + SQLAlchemy + Alembic 提供了强大而灵活的数据库迁移解决方案:

✅ 优势

  • 类型安全:完整的Python类型注解支持

  • 异步支持:原生async/await支持

  • 多数据库:支持主流数据库系统

  • 自动化:自动检测模型变更

  • 版本控制:完整的迁移版本管理

🎯 关键要点

  1. 模型设计:明确指定字段类型和长度

  2. 迁移审查:每次迁移前检查生成的SQL

  3. 测试验证:在测试环境验证迁移

  4. 备份机制:生产环境迁移前必须备份

  5. 监控回滚:准备迁移失败的回滚方案

📚 进一步学习


祝您的数据库迁移工作顺利! 🚀