Python 操作 MySQL 数据库完整教程

1. 前言

在现代 Web 开发、数据爬取、自动化报表等场景中,关系型数据库依然是数据存储与结构化管理的核心选择之一——尤其是MySQL** 凭借开源免费、生态完善的特性,成为绝大多数入门和生产环境的标配。

本教程将从0到可用到生产优化的思路,用简洁实用的方式,带你快速上手 Python 操作 MySQL,涵盖基础连接、CRUD、事务、ORM 等常用功能,附生产级最佳实践避坑。


2. 环境准备

先花1分钟,把本地开发环境搭起来。

2.1 快速部署 MySQL(Docker)

不想装原生 MySQL 怕搞乱本地环境?Docker 一键起是最推荐的方案:

# 拉取镜像、运行、配置密码、端口映射(映射到本地 3306
docker run --name local-mysql8 \
-e MYSQL_ROOT_PASSWORD=123456 \
-p 3306:3306 \
-d mysql:8.0

启动后,**记得进入容器或用 Navicat/DBeaver 先手动建个测试库 spiders(或者代码里如果建也行,但习惯手动快速测前先搞个总库)。

2.2 安装 Python 驱动库

Python 生态里目前有几个主流选项:

库名特点适用场景
PyMySQL纯 Python 实现,兼容性强,无需编译依赖少入门学习、轻量级应用、跨平台环境
mysql-connector-pythonMySQL 官方 Python 驱动,性能略优对性能要求稍高、官方支持的场景
SQLAlchemyPython 最流行的 ORM 工具,支持多数据库复杂业务逻辑、需要避免手写 SQL 灵活性的项目

入门首选 PyMySQL,生产用 SQLAlchemy 可选。 先装基础库:

# cryptography 用于加密连接 MySQL 8.0 默认的 caching_sha2_password
pip install pymysql cryptography

3. 安全高效的数据库连接

3.1 入门级:单次基础连接

最简单的测试用连接,但**绝对不要直接用在生产环境!

import pymysql

# 单次连接(测试时临时用
try:
    # 连接参数说明
    db = pymysql.connect(
        host='localhost',
        user='root',
        password='123456',
        database='spiders',
        port=3306,
        charset='utf8mb4',  # 必须是这个!支持 emoji/完整 Unicode
        cursorclass=pymysql.cursors.DictCursor  # 返回字典,不是元组,更方便
    )
    
    # 测试连接
    with db.cursor() as cursor:
        cursor.execute("SELECT VERSION()")
        result = cursor.fetchone()
        print(f"✅ 数据库连接成功,版本:{result['VERSION()']}")
finally:
    # 用完必须关闭!
    if db.open:
        db.close()
        print("🔌 连接已关闭")

3.2 生产级:使用连接池

单次连接的创建/销毁成本极高,生产环境必须用连接池。推荐用 DBUtils: 先安装:

pip install dbutils

连接池代码:

from dbutils.pooled_db import PooledDB
import pymysql

# 全局初始化连接池(只初始化一次!
_pool = None

def init_pool():
    global _pool
    if _pool is None:
        _pool = PooledDB(
            creator=pymysql,
            maxconnections=10,  # 最大连接数(可根据服务器配置调整
            mincached=2,        # 初始化时至少保留的空闲连接
            host='localhost',
            user='root',
            password='123456',
            database='spiders',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
    return _pool

def get_connection():
    # 获取一个连接
    pool = init_pool()
    return pool.connection()

之后所有数据库操作都用 get_connection() 获取连接。


4. 核心 CRUD 操作

先把核心操作封装几个常用的学生表来演示。

4.1 初始化表结构

记得先有测试库 spiders,这里用连接池创建表:

def create_students_table():
    sql = """
    CREATE TABLE IF NOT EXISTS students (
        id VARCHAR(255) NOT NULL COMMENT '学号',
        name VARCHAR(255) NOT NULL COMMENT '姓名',
        age INT NOT NULL COMMENT '年龄',
        gender VARCHAR(10) DEFAULT NULL COMMENT '性别',
        PRIMARY KEY (id)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='学生信息表'
    """
    # with 自动管理连接和提交回滚
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql)
    print("✅ 学生表创建/检查完成")

注意:PyMySQL 的 with 语句包裹连接,只会在退出时自动提交(前提是没有异常;有异常自动回滚)。

4.2 插入数据

4.2.1 单条插入

def insert_single_student():
    sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            # %s 是参数占位符,千万不能用字符串拼接!防 SQL 注入
            cursor.execute(sql, ('20240001', '张三', 20))
    print(f"✅ 插入成功,影响行数:{cursor.rowcount}")

4.2.2 批量插入(性能强推!

单条插入 1000 条可能要 1000 次 IO,批量只要 1 次左右!

def insert_batch_students():
    sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
    # 批量数据
    data = [
        ('20240002', '李四', 21),
        ('20240003', '王五', 22),
        ('20240004', '赵六', 20)
    ]
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.executemany(sql, data)
    print(f"✅ 批量插入成功,影响行数:{cursor.rowcount}")

4.2.3 存在则更新,不存在则插入(Upsert)

爬虫、同步数据必备!

def upsert_student():
    sql = """
    INSERT INTO students(id, name, age, gender) 
    VALUES(%s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE 
    name=VALUES(name), age=VALUES(age), gender=VALUES(gender)
    """
    with get_connection() as conn:
        with conn.cursor() as cursor:
            # 如果学号 20240001 存在,更新姓名年龄性别;不存在插入
            cursor.execute(sql, ('20240001', '张三三', 23, '男'))
    print(f"✅ Upsert 完成,影响行数:{cursor.rowcount}")

提示:ON DUPLICATE KEY UPDATE 后,影响行数:1 是插入,2 是更新。


4.3 更新、删除、查询

4.3.1 更新

def update_student():
    sql = "UPDATE students SET age = %s WHERE name = %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (21, '李四'))

4.3.2 删除

def delete_student():
    sql = "DELETE FROM students WHERE age < %s"
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql, (21,))
    print(f"✅ 删除成功,影响行数:{cursor.rowcount}")

4.3.3 查询

单条、多条、分页、大数据流式查询(大数据量不一次性加载)
def query_students():
    # 1. 单条查询 fetchone()
    # 2. 多条查询 fetchall()
    # 3. 分页查询(常用
    # 4. 大数据量流式查询(不要用 fetchall() 一次性拉到内存
    pass

(篇幅有限,**分页和流式查询可以参考原代码核心,这里提重点:

  • 分页用 LIMIT page_size OFFSET offset,注意 offset 大的时候性能下降,可优化为 WHERE id > last_id LIMIT page_size
  • 流式查询直接用 fetchone() 循环,或者用 pymysql.cursors.SSCursor 替代普通游标。

5. 事务处理:保证数据一致性

事务的 ACID 特性在转账、订单、同步等关键操作中必不可少——要么全成功,要么全失败。

def transfer_money(from_account, to_account, amount):
    # 先创建测试转账表
    create_accounts_sql = """
    CREATE TABLE IF NOT EXISTS accounts (
        id VARCHAR(255) NOT NULL PRIMARY KEY,
        balance DECIMAL(10,2) NOT NULL DEFAULT 0.00
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    """
    # 插入测试数据
    insert_test_data_sql = """
    INSERT IGNORE INTO accounts(id, balance) VALUES
    ('A1001', 1000.00),
    ('A1002', 500.00);
    """
    
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute(create_accounts_sql)
            cursor.execute(insert_test_data_sql)
            # 开始核心转账
            try:
                # 检查余额
                cursor.execute("SELECT balance FROM accounts WHERE id = %s", (from_account,))
                from_balance = cursor.fetchone()['balance']
                if from_balance < amount:
                    raise ValueError(f"账户余额不足!当前余额:{from_balance}")
                
                # 扣款
                cursor.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, from_account)
                )
                
                # 模拟出错(测试回滚)
                # raise Exception("网络中断!")
                
                # 存款
                cursor.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to_account)
                )
                print(f"✅ 转账成功")
            except Exception as e:
                # 这里手动回滚(with 自动回滚,但显式更清楚,或者加逻辑判断
                conn.rollback()
                print(f"❌ 转账失败,已回滚:{e}")

6. 进阶:用 SQLAlchemy ORM 简化代码

手写 SQL 虽然灵活,但复杂业务逻辑容易出错。SQLAlchemy 是 Python 最流行的 ORM,把数据库表映射成 Python 类,操作像操作对象。

先安装:

pip install sqlalchemy pymysql

代码示例:

from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 1. 定义基类
Base = declarative_base()

# 2. 映射表(对应之前的 students 表
class StudentORM(Base):
    __tablename__ = 'students'
    
    id = Column(String(255), primary_key=True, comment='学号')
    name = Column(String(255), nullable=False, comment='姓名')
    age = Column(Integer, nullable=False, comment='年龄')
    gender = Column(String(10), comment='性别')

# 3. 创建引擎(连接字符串格式:dialect+driver://user:pass@host:port/db
engine = create_engine(
    'mysql+pymysql://root:123456@localhost:3306/spiders',
    pool_size=10,  # 内置连接池参数
    pool_recycle=3600  # 避免 MySQL 默认 8 小时空闲断开
)

# 4. 创建表(如果不存在)
Base.metadata.create_all(engine)

# 5. 创建会话工厂(Session
SessionLocal = sessionmaker(bind=engine)

# 测试 ORM 操作
def test_orm():
    db = SessionLocal()
    try:
        # 插入
        new_stu = StudentORM(id='20240005', name='钱七', age=22, gender='女')
        db.add(new_stu)
        
        # 查询所有 >=20岁的学生
        students = db.query(StudentORM).filter(StudentORM.age >= 20).all()
        print("✅ 查询结果:")
        for stu in students:
            print(stu.id, stu.name, stu.age)
        
        db.commit()
    except Exception as e:
        db.rollback()
        print(f"❌ ORM 操作失败:{e}")
    finally:
        db.close()

test_orm()

7. 生产级最佳实践与避坑

  1. 必须用连接池:DBUtils 或者 SQLAlchemy 内置连接池
  2. 必须用参数化查询:防 SQL 注入(%s 占位符
  3. **字符集统一用 utf8mb4:支持 emoji/完整 Unicode
  4. 事务 ACID:关键操作加事务
  5. 资源释放:用 with 语句或显式关闭连接/会话
  6. 批量操作:大数据量用批量插入/更新
  7. 连接超时与重连:配置 pool_recycle/connect_timeout 等参数
  8. 索引优化:对常用查询的字段加索引

8. 总结

本教程从基础到进阶,覆盖了 Python 操作 MySQL 的常用功能。 入门学习用 PyMySQL 单次连接测试, 轻量级生产用 PyMySQL + DBUtils 连接池, 复杂业务逻辑用 SQLAlchemy ORM。 掌握这些,足以应对绝大多数 Python 与 MySQL 交互的场景!