Python 操作 MySQL 数据库完整教程

1. 前言

在现代 Web 开发、数据爬取、自动化报表等场景中,relational-database依然是结构化数据管理的核心选择之一。其中 MySQL 凭借开源免费、生态成熟、性能稳定的优势,成为从入门到生产环境使用最广泛的数据库。

本教程采用 从 0 到可用,再到生产优化 的思路,用简洁实用的代码示例,带你快速掌握 Python 操作 MySQL 的核心技能。内容涵盖environment-setup、安全连接、CRUD、事务处理以及 ORM 实战,并附上生产级最佳实践。


2. 环境准备

2.1 快速部署 MySQL(Docker)

如果你不想在本机直接安装 MySQL 服务,使用 Docker 一键启动是最快捷的方案:

docker run --name local-mysql8 \
  -e MYSQL_ROOT_PASSWORD=123456 \
  -p 3306:3306 \
  -d mysql:8.0

启动后,可以通过 docker exec -it local-mysql8 mysql -uroot -p123456 进入容器,并提前创建测试数据库:

CREATE DATABASE IF NOT EXISTS spiders DEFAULT CHARACTER SET utf8mb4;

也可以使用 Navicat、DBeaver 等图形化工具连接。

2.2 安装 Python 驱动库

Python 操作 MySQL 常用的库有以下几种:

库名特点适用场景
PyMySQL纯 Python 实现,兼容性强,无编译依赖,安装简单入门学习、轻量级应用、跨平台环境
mysql-connector-pythonMySQL 官方提供的 Python 驱动,性能略优对性能有进一步要求、官方支持的项目
SQLAlchemyPython 最流行的 ORM 工具,可操作多款数据库,屏蔽 SQL 差异复杂业务逻辑、团队协作、多数据库项目

入门阶段首选 PyMySQL,后期可结合 SQLAlchemy 提升开发效率。安装基础库:

pip install pymysql cryptography

cryptography 用于支持 MySQL 8.0 默认的 caching_sha2_password 认证插件。


3. 安全高效的数据库连接

3.1 入门级:单次基础连接

这种连接方式适合快速验证和学习,切勿直接用于生产环境

import pymysql

try:
    db = pymysql.connect(
        host='localhost',
        user='root',
        password='123456',
        database='spiders',
        port=3306,
        charset='utf8mb4',                 # 必须用 utf8mb4,支持 Emoji 和完整 Unicode
        cursorclass=pymysql.cursors.DictCursor  # 返回字典,方便按列名读取
    )

    with db.cursor() as cursor:
        cursor.execute("SELECT VERSION()")
        result = cursor.fetchone()
        print(f"✅ 数据库连接成功,版本:{result['VERSION()']}")
finally:
    if db.open:
        db.close()
        print("🔌 连接已关闭")

3.2 生产级:使用连接池

频繁创建和销毁数据库连接开销极大,生产环境务必使用连接池。推荐 DBUtils

pip install dbutils

连接池配置示例:

from dbutils.pooled_db import PooledDB
import pymysql

# 全局连接池对象,只初始化一次
_pool = None

def init_pool():
    global _pool
    if _pool is None:
        _pool = PooledDB(
            creator=pymysql,
            maxconnections=10,    # 最大连接数,依据服务器配置调整
            mincached=2,         # 启动时保留的空闲连接数
            host='localhost',
            user='root',
            password='123456',
            database='spiders',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
    return _pool

def get_connection():
    pool = init_pool()
    return pool.connection()

后续所有数据库操作都通过 get_connection() 获取连接,资源由连接池统一管理。


4. 核心 CRUD 操作

我们通过一个 students 表来演示增删改查。

4.1 初始化表结构

def create_students_table():
    sql = """
    CREATE TABLE IF NOT EXISTS students (
        id VARCHAR(255) NOT NULL COMMENT '学号',
        name VARCHAR(255) NOT NULL COMMENT '姓名',
        age INT NOT NULL COMMENT '年龄',
        gender VARCHAR(10) DEFAULT NULL COMMENT '性别',
        PRIMARY KEY (id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='学生信息表'
    """
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql)
    print("✅ 学生表创建/检查完成")

with get_connection() as conn 会自动处理事务:没有异常则提交,发生异常则回滚。

4.2 插入数据

4.2.1 单条插入

def insert_single_student():
    sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, ('20240001', '张三', 20))
    print(f"✅ 插入成功,影响行数:{cursor.rowcount}")

⚠️ 占位符必须使用 %s千万不要用字符串拼接,否则有 SQL 注入风险!

4.2.2 批量插入(强烈推荐)

插入 1000 条数据,单条插入可能需要 1000 次网络交互,而批量插入只需 1 次左右。

def insert_batch_students():
    sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
    data = [
        ('20240002', '李四', 21),
        ('20240003', '王五', 22),
        ('20240004', '赵六', 20)
    ]
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.executemany(sql, data)
    print(f"✅ 批量插入成功,影响行数:{cursor.rowcount}")

4.2.3 存在则更新,不存在则插入(Upsert)

这是爬虫数据同步、幂等写入的必备操作。

def upsert_student():
    sql = """
    INSERT INTO students(id, name, age, gender) 
    VALUES(%s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE 
        name=VALUES(name), age=VALUES(age), gender=VALUES(gender)
    """
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, ('20240001', '张三三', 23, '男'))
    print(f"✅ Upsert 完成,影响行数:{cursor.rowcount}")

返回值规则:影响行数为 1 表示插入新记录,为 2 表示更新已有记录。


4.3 更新、删除、查询

更新数据

def update_student():
    sql = "UPDATE students SET age = %s WHERE name = %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (21, '李四'))
    print("✅ 更新完成")

删除数据

def delete_student():
    sql = "DELETE FROM students WHERE age < %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (21,))
    print(f"✅ 删除成功,影响行数:{cursor.rowcount}")

查询数据

单条查询

def query_one():
    sql = "SELECT id, name, age FROM students WHERE id = %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, ('20240001',))
            row = cursor.fetchone()
            if row:
                print(row)

多条查询

def query_all():
    sql = "SELECT id, name, age FROM students"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql)
            rows = cursor.fetchall()
            for row in rows:
                print(row)

分页查询

def query_page(page, page_size):
    offset = (page - 1) * page_size
    sql = "SELECT id, name, age FROM students LIMIT %s OFFSET %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (page_size, offset))
            rows = cursor.fetchall()
            return rows

大数据量分页时,OFFSET 过大会导致性能下降,可改用基于自增主键的游标分页: WHERE id > last_id LIMIT page_size

大数据量流式查询

当结果集可能非常大时,避免一次性将所有数据加载到内存,可以使用流式游标:

def stream_query():
    sql = "SELECT id, name, age FROM students"
    with get_connection() as conn:
        # 使用 SSCursor(服务器端游标),逐条返回结果
        with conn.cursor(pymysql.cursors.SSCursor) as cursor:
            cursor.execute(sql)
            row = cursor.fetchone()
            while row:
                print(row)
                row = cursor.fetchone()

SSCursor 会将结果集留在服务器端,由客户端逐行读取,内存占用恒定,非常适合导出大量数据。


5. 事务处理:保证数据一致性

在转账、订单等关键业务中,需要一系列操作要么全部成功,要么全部失败(ACID 特性)。下面的转账示例演示事务的用法。

def transfer_money(from_account, to_account, amount):
    # 创建账户表(如果不存在)
    create_sql = """
    CREATE TABLE IF NOT EXISTS accounts (
        id VARCHAR(255) NOT NULL PRIMARY KEY,
        balance DECIMAL(10,2) NOT NULL DEFAULT 0.00
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    """
    # 插入测试数据(幂等)
    insert_sql = """
    INSERT IGNORE INTO accounts(id, balance) VALUES
    ('A1001', 1000.00),
    ('A1002', 500.00);
    """

    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(create_sql)
            cursor.execute(insert_sql)

            try:
                # 检查余额
                cursor.execute("SELECT balance FROM accounts WHERE id = %s", (from_account,))
                row = cursor.fetchone()
                if not row or row['balance'] < amount:
                    raise ValueError("账户余额不足或账户不存在")

                # 扣款
                cursor.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, from_account)
                )

                # 模拟中途出错(可取消注释测试回滚)
                # raise Exception("网络中断!")

                # 存款
                cursor.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to_account)
                )

                print("✅ 转账成功")
            except Exception as e:
                # 任何步骤出错,回滚整个事务
                conn.rollback()
                print(f"❌ 转账失败,已回滚:{e}")

因为使用了 with 语句,即使不显式提交,在无异常退出时也会自动 commit();一旦发生异常,上下文管理器会执行 rollback()。显式调用 rollback() 可以让意图更清晰。


6. 进阶:使用 SQLAlchemy ORM 简化代码

手写 SQL 虽然灵活,但当业务逻辑复杂时,代码可读性和维护性会下降。SQLAlchemy 作为 Python 最流行的 ORM,让你像操作对象一样操作数据库。

安装:

pip install sqlalchemy pymysql

完整示例:

from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 1. 基类
Base = declarative_base()

# 2. 定义模型(映射到 students 表)
class StudentORM(Base):
    __tablename__ = 'students'

    id = Column(String(255), primary_key=True, comment='学号')
    name = Column(String(255), nullable=False, comment='姓名')
    age = Column(Integer, nullable=False, comment='年龄')
    gender = Column(String(10), comment='性别')

# 3. 创建引擎(内置连接池)
engine = create_engine(
    'mysql+pymysql://root:123456@localhost:3306/spiders',
    pool_size=10,          # 连接池大小
    pool_recycle=3600      # 防止连接被 MySQL 超时断开
)

# 4. 自动创建表(如果不存在)
Base.metadata.create_all(engine)

# 5. 创建会话工厂
SessionLocal = sessionmaker(bind=engine)

def test_orm():
    db = SessionLocal()
    try:
        # 插入
        new_student = StudentORM(id='20240005', name='钱七', age=22, gender='女')
        db.add(new_student)

        # 查询年龄 >= 20 的学生
        students = db.query(StudentORM).filter(StudentORM.age >= 20).all()
        print("✅ 查询结果:")
        for stu in students:
            print(stu.id, stu.name, stu.age)

        db.commit()
    except Exception as e:
        db.rollback()
        print(f"❌ ORM 操作失败:{e}")
    finally:
        db.close()

test_orm()

通过 ORM,查询、更新和删除都变成了直观的 Python 对象操作,大幅降低手写 SQL 的错误率。


7. 生产级最佳实践与避坑

在生产环境中使用 Python + MySQL,请务必遵守以下原则:

  1. 必须使用连接池:无论是 DBUtils 还是 SQLAlchemy 内置连接池,都能显著减少连接开销。
  2. 必须用参数化查询:所有用户输入都应通过 %s 占位符传入,杜绝 SQL 注入。
  3. 字符集统一使用 utf8mb4:支持 Emoji 和全 Unicode 字符,避免乱码。
  4. 合理使用事务:关键业务逻辑(如支付、库存扣减)必须包裹在事务中,确保数据一致性。
  5. 及时释放资源:充分利用 with 语句或 try-finally 关闭连接、游标。
  6. 批量操作提效:大量写入时优先使用 executemany() 或批量 INSERT 语句。
  7. 配置连接超时与自动重连:设置 pool_recycle(连接最大重用时间)和 connect_timeout,避免连接被闲置断开。
  8. 为常用查询字段添加索引:分析慢查询日志,针对性地优化 WHEREJOIN 等条件字段。
  9. 敏感信息脱钩:数据库密码等配置应从环境变量或配置中心读取,严禁硬编码。

8. 总结

本教程从基础连接入手,逐步讲解了如何在 Python 中安全、高效地操作 MySQL:

  • 快速上手:PyMySQL 单次连接,适合学习与小工具。
  • 生产方案:PyMySQL + DBUtils 连接池,适合轻量级后端服务。
  • 进阶方式:SQLAlchemy ORM,适合复杂业务系统,大幅提升开发效率。

掌握这些内容,足以让你应对绝大多数 Python 与 MySQL 的数据交互场景。配合事务、批量操作和资源管理的良好习惯,可以写出既稳健又高效的数据库代码。