FastAPI + SQLAlchemy 数据库迁移最佳实践指南
📋 目录
概述
什么是数据库迁移?
数据库迁移是管理数据库架构变更的版本控制系统。它允许您:
跟踪数据库结构的所有变更
在团队间同步数据库架构
安全地升级和回滚数据库版本
自动化部署过程
为什么选择Alembic?
Alembic 是SQLAlchemy的官方迁移工具,相当于Django的migrations系统:
✅ 版本控制 :每个迁移都有唯一版本号
✅ 自动生成 :自动检测模型变更
✅ 双向迁移 :支持升级和降级
✅ 异步支持 :完全支持async/await
✅ 多数据库 :支持MySQL、PostgreSQL、SQLite等
环境准备
1. 安装依赖
1 2 3 4 5 6 7 pip install fastapi sqlalchemy alembic pip install aiomysql PyMySQL pip install asyncpg psycopg2-binary pip install aiosqlite
2. 项目结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 your-project/ ├── alembic.ini # Alembic配置文件 ├── migrations/ # 迁移目录 │ ├── env.py # 环境配置 │ ├── script.py.mako # 迁移模板 │ └── versions/ # 迁移版本文件 ├── core/ │ └── models/ # SQLAlchemy模型 │ ├── __init__.py │ ├── base.py # 基础模型 │ └── user.py # 具体模型 ├── config/ │ └── database.py # 数据库配置 └── main.py # FastAPI应用
Alembic配置
1. 初始化Alembic
1 2 alembic init migrations
2. 配置数据库连接
config/database.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 from pydantic_settings import BaseSettingsclass DatabaseSettings (BaseSettings ): """数据库配置类""" DB_TYPE: str = "mysql" MYSQL_USER: str = "root" MYSQL_PASSWORD: str = "password" MYSQL_HOST: str = "localhost" MYSQL_PORT: int = 3306 MYSQL_DB: str = "your_database" DB_POOL_SIZE: int = 5 DB_MAX_OVERFLOW: int = 10 DB_ECHO: bool = False @property def sync_database_url (self ) -> str : """同步数据库 URL""" if self .DB_TYPE.lower() == "mysql" : return ( f"mysql+pymysql://{self.MYSQL_USER} :{self.MYSQL_PASSWORD} @" f"{self.MYSQL_HOST} :{self.MYSQL_PORT} /{self.MYSQL_DB} " ) elif self .DB_TYPE.lower() == "sqlite" : return f"sqlite:///database.db" else : raise ValueError(f"不支持的数据库类型: {self.DB_TYPE} " ) @property def async_database_url (self ) -> str : """异步数据库 URL""" if self .DB_TYPE.lower() == "mysql" : return ( f"mysql+aiomysql://{self.MYSQL_USER} :{self.MYSQL_PASSWORD} @" f"{self.MYSQL_HOST} :{self.MYSQL_PORT} /{self.MYSQL_DB} " ) elif self .DB_TYPE.lower() == "sqlite" : return f"sqlite+aiosqlite:///database.db" else : raise ValueError(f"不支持的数据库类型: {self.DB_TYPE} " ) class Config : env_file = ".env" case_sensitive = True settings = DatabaseSettings()
3. 配置模型基类
core/models/base.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from datetime import datetimefrom typing import Any from sqlalchemy import MetaDatafrom sqlalchemy.ext.declarative import declared_attrfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnconvention = { "ix" : "ix_%(column_0_label)s" , "uq" : "uq_%(table_name)s_%(column_0_name)s" , "ck" : "ck_%(table_name)s_%(constraint_name)s" , "fk" : "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s" , "pk" : "pk_%(table_name)s" , } metadata = MetaData(naming_convention=convention) class Base (DeclarativeBase ): """声明性基类""" metadata = metadata @declared_attr.directive def __tablename__ (cls ) -> str : """自动生成表名""" return cls.__name__.lower() id : Mapped[int ] = mapped_column(primary_key=True ) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) updated_at: Mapped[datetime] = mapped_column( default=datetime.utcnow, onupdate=datetime.utcnow )
4. 配置Alembic环境
migrations/env.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 """Alembic 环境配置""" import asyncioimport sysfrom logging.config import fileConfigfrom pathlib import Pathfrom alembic import contextfrom sqlalchemy import poolfrom sqlalchemy.engine import Connectionfrom sqlalchemy.ext.asyncio import async_engine_from_configproject_root = Path(__file__).parent.parent sys.path.insert(0 , str (project_root)) from config.database import settingsfrom core.models import metadata config = context.config if config.config_file_name is not None : fileConfig(config.config_file_name) target_metadata = metadata def run_migrations_offline () -> None : """离线模式运行迁移""" url = settings.sync_database_url context.configure( url=url, target_metadata=target_metadata, literal_binds=True , dialect_opts={"paramstyle" : "named" }, ) with context.begin_transaction(): context.run_migrations() def do_run_migrations (connection: Connection ) -> None : """执行迁移""" context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() async def run_async_migrations () -> None : """异步运行迁移""" configuration = config.get_section(config.config_ini_section) configuration["sqlalchemy.url" ] = settings.async_database_url connectable = async_engine_from_config( configuration, prefix="sqlalchemy." , poolclass=pool.NullPool, ) async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) await connectable.dispose() def run_migrations_online () -> None : """在线模式运行迁移""" asyncio.run(run_async_migrations()) if context.is_offline_mode(): run_migrations_offline() else : run_migrations_online()
迁移工作流程
1. 创建模型
core/models/user.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from typing import Optional from sqlalchemy import Stringfrom sqlalchemy.orm import Mapped, mapped_columnfrom .base import Base class User (Base ): """用户模型""" username: Mapped[str ] = mapped_column(String(50 ), unique=True ) email: Mapped[Optional [str ]] = mapped_column(String(120 ), unique=True ) password_hash: Mapped[str ] = mapped_column(String(255 )) is_active: Mapped[bool ] = mapped_column(default=True )
⚠️ 重要:MySQL要求所有VARCHAR字段必须指定长度!
2. 生成迁移文件
1 2 3 4 5 alembic revision --autogenerate -m "Add user table"
3. 审查迁移文件
生成的迁移文件位于 migrations/versions/
,务必检查:
SQL语句是否正确
是否遗漏了字段或索引
数据迁移逻辑是否安全
4. 执行迁移
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 alembic upgrade head alembic upgrade <revision_id> alembic downgrade -1 alembic downgrade <revision_id>
5. 查看迁移状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 alembic current alembic history alembic history --verbose
最佳实践
1. 模型设计原则
✅ 正确做法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 username: Mapped[str ] = mapped_column(String(50 ), unique=True ) created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow) description: Mapped[Optional [str ]] = mapped_column(String(1000 )) from enum import Enum class UserStatus (str , Enum): ACTIVE = "active" INACTIVE = "inactive" status: Mapped[UserStatus] = mapped_column(default=UserStatus.ACTIVE)
❌ 避免的做法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 username: Mapped[str ] username = Column(String(50 )) username: Mapped[str ] = mapped_column(String(999 ))
2. 迁移文件命名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 alembic revision --autogenerate -m "Add user authentication tables" alembic revision --autogenerate -m "Add index on user.email" alembic revision --autogenerate -m "Update user.password_hash length" alembic revision --autogenerate -m "update" alembic revision --autogenerate -m "fix" alembic revision --autogenerate -m "changes"
3. 数据迁移
对于复杂的数据变更,手动编写迁移文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 """ 增加用户全名字段并迁移现有数据 Revision ID: abc123def456 Create Date: 2024-01-01 12:00:00.000000 """ from alembic import opimport sqlalchemy as sa def upgrade () -> None : op.add_column('user' , sa.Column('full_name' , sa.String(100 ), nullable=True )) connection = op.get_bind() connection.execute( "UPDATE user SET full_name = CONCAT(first_name, ' ', last_name) " "WHERE first_name IS NOT NULL AND last_name IS NOT NULL" ) def downgrade () -> None : op.drop_column('user' , 'full_name' )
4. 索引管理
1 2 3 4 5 6 7 8 9 10 11 12 13 class User (Base ): email: Mapped[str ] = mapped_column(String(120 ), unique=True , index=True ) __table_args__ = ( Index('ix_user_email_active' , 'email' , 'is_active' ), )
5. 外键约束
1 2 3 4 5 6 7 8 9 10 11 class Post (Base ): title: Mapped[str ] = mapped_column(String(200 )) author_id: Mapped[int ] = mapped_column(ForeignKey("user.id" )) author = relationship("User" , back_populates="posts" )
数据库兼容性
MySQL 特殊考虑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 content: Mapped[str ] = mapped_column(String(5000 )) "mysql+aiomysql://user:pass@host/db?charset=utf8mb4" is_active: Mapped[bool ] = mapped_column(default=True )
PostgreSQL 特殊考虑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 content: Mapped[str ] = mapped_column(Text()) from sqlalchemy.dialects.postgresql import ARRAYtags: Mapped[list ] = mapped_column(ARRAY(String)) metadata: Mapped[dict ] = mapped_column(JSON)
SQLite 特殊考虑
生产环境部署
1. 部署前检查清单
[ ] 备份生产数据库
[ ] 在测试环境验证迁移
[ ] 检查迁移文件的SQL语句
[ ] 评估迁移执行时间
[ ] 准备回滚计划
2. 自动化部署脚本
scripts/deploy_migrations.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 """ 生产环境迁移部署脚本 """ import asyncioimport subprocessimport sysfrom datetime import datetime async def backup_database (): """备份数据库""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S" ) backup_file = f"backup_{timestamp} .sql" cmd = [ "mysqldump" , "-h" , "localhost" , "-u" , "user" , "-p" , "database_name" , ">" , backup_file ] print (f"创建数据库备份: {backup_file} " ) def run_migrations (): """执行迁移""" try : print ("开始执行数据库迁移..." ) result = subprocess.run( ["alembic" , "upgrade" , "head" ], capture_output=True , text=True , check=True ) print ("✅ 迁移执行成功!" ) print (result.stdout) return True except subprocess.CalledProcessError as e: print ("❌ 迁移执行失败!" ) print (e.stderr) return False async def main (): """主函数""" print ("=== 生产环境数据库迁移 ===" ) await backup_database() if run_migrations(): print ("🎉 部署完成!" ) else : print ("💥 部署失败,请检查错误信息" ) sys.exit(1 ) if __name__ == "__main__" : asyncio.run(main())
3. Docker环境部署
Dockerfile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 FROM python:3.11 WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . CMD ["sh" , "-c" , "alembic upgrade head && python main.py" ]
docker-compose.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 version: '3.8' services: app: build: . ports: - "8000:8000" environment: - DB_TYPE=mysql - MYSQL_HOST=db - MYSQL_DB=myapp depends_on: - db db: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: password MYSQL_DATABASE: myapp volumes: - mysql_data:/var/lib/mysql volumes: mysql_data:
故障排除
常见问题及解决方案
1. “VARCHAR requires a length on dialect mysql”
问题 :MySQL要求VARCHAR字段指定长度
解决 :
1 2 3 4 5 6 7 8 9 10 11 description: Mapped[str ] description: Mapped[str ] = mapped_column(String(1000 ))
2. “No module named ‘core’”
问题 :迁移环境找不到项目模块
解决 :在 migrations/env.py
中添加路径:
1 2 3 4 5 6 7 8 9 10 11 import sysfrom pathlib import Path project_root = Path(__file__).parent.parent sys.path.insert(0 , str (project_root))
3. 迁移文件冲突
问题 :多人开发时迁移版本冲突
解决 :
1 2 3 4 5 alembic merge -m "Merge migration branches" head1 head2
4. 回滚迁移
问题 :需要撤销错误的迁移
解决 :
1 2 3 4 5 6 7 8 9 10 11 alembic downgrade -1 alembic downgrade abc123def456
5. 手动修复迁移状态
问题 :迁移状态与实际数据库不一致
解决 :
1 2 3 4 5 6 7 8 9 10 11 alembic stamp head alembic stamp abc123def456
实际示例
完整的用户管理模型
core/models/user.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 from datetime import datetimefrom enum import Enumfrom typing import Optional from sqlalchemy import String, Indexfrom sqlalchemy.orm import Mapped, mapped_column, relationship from .base import Base class UserRole (str , Enum): ADMIN = "admin" USER = "user" MODERATOR = "moderator" class User (Base ): """用户模型""" username: Mapped[str ] = mapped_column(String(50 ), unique=True , index=True ) email: Mapped[str ] = mapped_column(String(120 ), unique=True , index=True ) password_hash: Mapped[str ] = mapped_column(String(255 )) full_name: Mapped[Optional [str ]] = mapped_column(String(100 )) role: Mapped[UserRole] = mapped_column(default=UserRole.USER) is_active: Mapped[bool ] = mapped_column(default=True ) last_login: Mapped[Optional [datetime]] __table_args__ = ( Index('ix_user_email_active' , 'email' , 'is_active' ), Index('ix_user_role_active' , 'role' , 'is_active' ), ) def __repr__ (self ) -> str : return f"<User(username='{self.username} ', email='{self.email} ')>"
完整的迁移工作流程示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 alembic init migrations alembic revision --autogenerate -m "Initial migration: add user table" alembic upgrade head alembic revision --autogenerate -m "Add user.full_name field" alembic upgrade head alembic current alembic history alembic downgrade -1
总结
FastAPI + SQLAlchemy + Alembic 提供了强大而灵活的数据库迁移解决方案:
✅ 优势
类型安全 :完整的Python类型注解支持
异步支持 :原生async/await支持
多数据库 :支持主流数据库系统
自动化 :自动检测模型变更
版本控制 :完整的迁移版本管理
🎯 关键要点
模型设计 :明确指定字段类型和长度
迁移审查 :每次迁移前检查生成的SQL
测试验证 :在测试环境验证迁移
备份机制 :生产环境迁移前必须备份
监控回滚 :准备迁移失败的回滚方案
📚 进一步学习
祝您的数据库迁移工作顺利! 🚀