Python 操作 MySQL 数据库完整教程
1. 前言
在现代 Web 开发、数据爬取、自动化报表等场景中,关系型数据库依然是数据存储与结构化管理的核心选择之一——尤其是MySQL** 凭借开源免费、生态完善的特性,成为绝大多数入门和生产环境的标配。
本教程将从0到可用到生产优化的思路,用简洁实用的方式,带你快速上手 Python 操作 MySQL,涵盖基础连接、CRUD、事务、ORM 等常用功能,附生产级最佳实践避坑。
2. 环境准备
先花1分钟,把本地开发环境搭起来。
2.1 快速部署 MySQL(Docker)
不想装原生 MySQL 怕搞乱本地环境?Docker 一键起是最推荐的方案:
# 拉取镜像、运行、配置密码、端口映射(映射到本地 3306
docker run --name local-mysql8 \
-e MYSQL_ROOT_PASSWORD=123456 \
-p 3306:3306 \
-d mysql:8.0
启动后,**记得进入容器或用 Navicat/DBeaver 先手动建个测试库 spiders(或者代码里如果建也行,但习惯手动快速测前先搞个总库)。
2.2 安装 Python 驱动库
Python 生态里目前有几个主流选项:
入门首选 PyMySQL,生产用 SQLAlchemy 可选。
先装基础库:
# cryptography 用于加密连接 MySQL 8.0 默认的 caching_sha2_password
pip install pymysql cryptography
3. 安全高效的数据库连接
3.1 入门级:单次基础连接
最简单的测试用连接,但**绝对不要直接用在生产环境!
import pymysql
# 单次连接(测试时临时用
try:
# 连接参数说明
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='spiders',
port=3306,
charset='utf8mb4', # 必须是这个!支持 emoji/完整 Unicode
cursorclass=pymysql.cursors.DictCursor # 返回字典,不是元组,更方便
)
# 测试连接
with db.cursor() as cursor:
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
print(f"✅ 数据库连接成功,版本:{result['VERSION()']}")
finally:
# 用完必须关闭!
if db.open:
db.close()
print("🔌 连接已关闭")
3.2 生产级:使用连接池
单次连接的创建/销毁成本极高,生产环境必须用连接池。推荐用 DBUtils:
先安装:
连接池代码:
from dbutils.pooled_db import PooledDB
import pymysql
# 全局初始化连接池(只初始化一次!
_pool = None
def init_pool():
global _pool
if _pool is None:
_pool = PooledDB(
creator=pymysql,
maxconnections=10, # 最大连接数(可根据服务器配置调整
mincached=2, # 初始化时至少保留的空闲连接
host='localhost',
user='root',
password='123456',
database='spiders',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
return _pool
def get_connection():
# 获取一个连接
pool = init_pool()
return pool.connection()
之后所有数据库操作都用 get_connection() 获取连接。
4. 核心 CRUD 操作
先把核心操作封装几个常用的学生表来演示。
4.1 初始化表结构
记得先有测试库 spiders,这里用连接池创建表:
def create_students_table():
sql = """
CREATE TABLE IF NOT EXISTS students (
id VARCHAR(255) NOT NULL COMMENT '学号',
name VARCHAR(255) NOT NULL COMMENT '姓名',
age INT NOT NULL COMMENT '年龄',
gender VARCHAR(10) DEFAULT NULL COMMENT '性别',
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='学生信息表'
"""
# with 自动管理连接和提交回滚
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql)
print("✅ 学生表创建/检查完成")
注意:PyMySQL 的 with 语句包裹连接,只会在退出时自动提交(前提是没有异常;有异常自动回滚)。
4.2 插入数据
4.2.1 单条插入
def insert_single_student():
sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
with get_connection() as conn:
with conn.cursor() as cursor:
# %s 是参数占位符,千万不能用字符串拼接!防 SQL 注入
cursor.execute(sql, ('20240001', '张三', 20))
print(f"✅ 插入成功,影响行数:{cursor.rowcount}")
4.2.2 批量插入(性能强推!
单条插入 1000 条可能要 1000 次 IO,批量只要 1 次左右!
def insert_batch_students():
sql = "INSERT INTO students(id, name, age) VALUES(%s, %s, %s)"
# 批量数据
data = [
('20240002', '李四', 21),
('20240003', '王五', 22),
('20240004', '赵六', 20)
]
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.executemany(sql, data)
print(f"✅ 批量插入成功,影响行数:{cursor.rowcount}")
4.2.3 存在则更新,不存在则插入(Upsert)
爬虫、同步数据必备!
def upsert_student():
sql = """
INSERT INTO students(id, name, age, gender)
VALUES(%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name=VALUES(name), age=VALUES(age), gender=VALUES(gender)
"""
with get_connection() as conn:
with conn.cursor() as cursor:
# 如果学号 20240001 存在,更新姓名年龄性别;不存在插入
cursor.execute(sql, ('20240001', '张三三', 23, '男'))
print(f"✅ Upsert 完成,影响行数:{cursor.rowcount}")
提示:ON DUPLICATE KEY UPDATE 后,影响行数:1 是插入,2 是更新。
4.3 更新、删除、查询
4.3.1 更新
def update_student():
sql = "UPDATE students SET age = %s WHERE name = %s"
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql, (21, '李四'))
4.3.2 删除
def delete_student():
sql = "DELETE FROM students WHERE age < %s"
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql, (21,))
print(f"✅ 删除成功,影响行数:{cursor.rowcount}")
4.3.3 查询
单条、多条、分页、大数据流式查询(大数据量不一次性加载)
def query_students():
# 1. 单条查询 fetchone()
# 2. 多条查询 fetchall()
# 3. 分页查询(常用
# 4. 大数据量流式查询(不要用 fetchall() 一次性拉到内存
pass
(篇幅有限,**分页和流式查询可以参考原代码核心,这里提重点:
- 分页用
LIMIT page_size OFFSET offset,注意 offset 大的时候性能下降,可优化为 WHERE id > last_id LIMIT page_size
- 流式查询直接用
fetchone() 循环,或者用 pymysql.cursors.SSCursor 替代普通游标。
5. 事务处理:保证数据一致性
事务的 ACID 特性在转账、订单、同步等关键操作中必不可少——要么全成功,要么全失败。
def transfer_money(from_account, to_account, amount):
# 先创建测试转账表
create_accounts_sql = """
CREATE TABLE IF NOT EXISTS accounts (
id VARCHAR(255) NOT NULL PRIMARY KEY,
balance DECIMAL(10,2) NOT NULL DEFAULT 0.00
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
# 插入测试数据
insert_test_data_sql = """
INSERT IGNORE INTO accounts(id, balance) VALUES
('A1001', 1000.00),
('A1002', 500.00);
"""
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(create_accounts_sql)
cursor.execute(insert_test_data_sql)
# 开始核心转账
try:
# 检查余额
cursor.execute("SELECT balance FROM accounts WHERE id = %s", (from_account,))
from_balance = cursor.fetchone()['balance']
if from_balance < amount:
raise ValueError(f"账户余额不足!当前余额:{from_balance}")
# 扣款
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
# 模拟出错(测试回滚)
# raise Exception("网络中断!")
# 存款
cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
print(f"✅ 转账成功")
except Exception as e:
# 这里手动回滚(with 自动回滚,但显式更清楚,或者加逻辑判断
conn.rollback()
print(f"❌ 转账失败,已回滚:{e}")
6. 进阶:用 SQLAlchemy ORM 简化代码
手写 SQL 虽然灵活,但复杂业务逻辑容易出错。SQLAlchemy 是 Python 最流行的 ORM,把数据库表映射成 Python 类,操作像操作对象。
先安装:
pip install sqlalchemy pymysql
代码示例:
from sqlalchemy import create_engine, Column, String, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 1. 定义基类
Base = declarative_base()
# 2. 映射表(对应之前的 students 表
class StudentORM(Base):
__tablename__ = 'students'
id = Column(String(255), primary_key=True, comment='学号')
name = Column(String(255), nullable=False, comment='姓名')
age = Column(Integer, nullable=False, comment='年龄')
gender = Column(String(10), comment='性别')
# 3. 创建引擎(连接字符串格式:dialect+driver://user:pass@host:port/db
engine = create_engine(
'mysql+pymysql://root:123456@localhost:3306/spiders',
pool_size=10, # 内置连接池参数
pool_recycle=3600 # 避免 MySQL 默认 8 小时空闲断开
)
# 4. 创建表(如果不存在)
Base.metadata.create_all(engine)
# 5. 创建会话工厂(Session
SessionLocal = sessionmaker(bind=engine)
# 测试 ORM 操作
def test_orm():
db = SessionLocal()
try:
# 插入
new_stu = StudentORM(id='20240005', name='钱七', age=22, gender='女')
db.add(new_stu)
# 查询所有 >=20岁的学生
students = db.query(StudentORM).filter(StudentORM.age >= 20).all()
print("✅ 查询结果:")
for stu in students:
print(stu.id, stu.name, stu.age)
db.commit()
except Exception as e:
db.rollback()
print(f"❌ ORM 操作失败:{e}")
finally:
db.close()
test_orm()
7. 生产级最佳实践与避坑
- 必须用连接池:DBUtils 或者 SQLAlchemy 内置连接池
- 必须用参数化查询:防 SQL 注入(%s 占位符
- **字符集统一用
utf8mb4:支持 emoji/完整 Unicode
- 事务 ACID:关键操作加事务
- 资源释放:用
with 语句或显式关闭连接/会话
- 批量操作:大数据量用批量插入/更新
- 连接超时与重连:配置
pool_recycle/connect_timeout 等参数
- 索引优化:对常用查询的字段加索引
8. 总结
本教程从基础到进阶,覆盖了 Python 操作 MySQL 的常用功能。
入门学习用 PyMySQL 单次连接测试,
轻量级生产用 PyMySQL + DBUtils 连接池,
复杂业务逻辑用 SQLAlchemy ORM。
掌握这些,足以应对绝大多数 Python 与 MySQL 交互的场景!