Python 操作 MySQL 数据库
Python 接入 MySQL 数据库
在 Python 3 生态中,主流的 MySQL 驱动库有 mysqlclient 和 pymysql 两种。两者的 API 几乎完全一致,只在导入模块名上有差别。我们选择纯 Python 实现的 pymysql,它不需要编译 C 扩展,在 Windows、macOS、Linux 上都能一键安装成功,非常适合新手入门,也能胜任生产环境的快速搭建。
本教程以提前创建好的 hrs(人力资源)数据库为例,一步步演示 Python 操作 MySQL 的完整流程与代码。
第一步:环境准备与连接流程
安装依赖
如果你的 MySQL 是 5.x 版本,只安装 pymysql 即可;如果对接的是 MySQL 8.x,还需要额外安装 cryptography,以支持新版默认的 caching_sha2_password 认证方式。
在终端(或 PyCharm 内置 Terminal)中执行:
pip install pymysql cryptography
操作流程梳理
pymysql 的标准操作可以归纳为 5 个核心步骤,我们用寄快递的流程来帮助记忆:
- 建立连接(Connection) — 打通你和仓库之间的专属通道,告诉程序仓库地址、端口、账号密码、数据库名和编码。
- 获取游标(Cursor) — 拿到仓库管理员的“扫描枪”,之后的增删改查全靠它。
- 执行 SQL 指令 — 通过游标向数据库发送具体的 SQL 语句。
- 处理结果或事务 — 如果是写操作(插入、删除、修改),需要
commit 确认保存,出现问题则 rollback 撤销;如果是查询,就用 fetch 系列方法拿数据。
- 关闭连接 — 用完之后立即断开连接,释放资源。
只要记住这五步,任何数据库操作都能迅速写出模式规范的代码。
第二步:核心代码实战
准备工作:创建专用数据库用户
出于安全考虑,不建议在生产环境或学习中长期使用 root 超管账号。我们先在 MySQL 中创建一个只拥有 hrs 库基本权限的普通用户:
-- 创建允许任意 IP 连接的 guest 用户
create user 'guest'@'%' identified by 'Guest.618';
-- 为 guest 用户授予 hrs 数据库的基础增删改查权限
grant insert, delete, update, select on `hrs`.* to 'guest'@'%';
-- 刷新权限,让配置立即生效
flush privileges;
提示:以上操作需要使用 root 账号在 MySQL 命令行或 Navicat 等工具中执行。
插入单条数据
下面的代码模拟了用户输入部门信息并保存到数据库的场景。
import pymysql
# 模拟用户输入
no = int(input('请输入部门编号: '))
name = input('请输入部门名称: ')
location = input('请输入部门所在地: ')
# 1. 建立连接(charset 使用 utf8mb4 支持完整的中文和 emoji)
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='guest',
password='Guest.618',
database='hrs',
charset='utf8mb4'
)
try:
# 2. 使用 with 获取游标,自动释放,无需手动关闭
with conn.cursor() as cursor:
# 3. 执行插入,%s 为占位符,pymysql 会自动转义,防止 SQL 注入
affected_rows = cursor.execute(
'insert into `tb_dept` (`dno`, `dname`, `dloc`) values (%s, %s, %s)',
(no, name, location)
)
if affected_rows == 1:
print('🎉 新增部门成功!')
# 4. 提交事务:只有 commit 后数据才会真正写入
conn.commit()
except pymysql.MySQLError as err:
# 出现异常时回滚
conn.rollback()
print(f'⚠️ 操作失败:{type(err)} - {err}')
finally:
# 5. 关闭连接
conn.close()
效率贴士:如果要批量插入上千条数据,推荐使用 cursor.executemany(sql, data_list),速度能提升几十倍。
删除数据
这里演示开启 autocommit=True 的写法,适合单条 SQL 自动提交的简单场景(多步操作不建议直接使用)。
import pymysql
no = int(input('请输入要删除的部门编号: '))
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='guest',
password='Guest.618',
database='hrs',
charset='utf8mb4',
autocommit=True # 开启自动提交
)
try:
with conn.cursor() as cursor:
affected_rows = cursor.execute(
'delete from `tb_dept` where `dno` = %s',
(no,) # 只有一个参数时也必须写成元组
)
if affected_rows == 1:
print('🎉 删除部门成功!')
else:
print('⚠️ 未找到该部门!')
finally:
conn.close()
更新数据
更新操作仍需手动管理事务,保证数据一致性。
import pymysql
no = int(input('请输入要修改的部门编号: '))
name = input('请输入新的部门名称: ')
location = input('请输入新的部门所在地: ')
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='guest',
password='Guest.618',
database='hrs',
charset='utf8mb4'
)
try:
with conn.cursor() as cursor:
affected_rows = cursor.execute(
'update `tb_dept` set `dname` = %s, `dloc` = %s where `dno` = %s',
(name, location, no)
)
if affected_rows == 1:
print('🎉 更新部门信息成功!')
conn.commit()
except pymysql.MySQLError as err:
conn.rollback()
print(f'⚠️ 操作失败:{err}')
finally:
conn.close()
查询数据
pymysql 提供了三种提取结果的方法,可根据数据量灵活选用:
fetchone():逐行返回元组,内存开销极小,适合大数据集。
fetchmany(size=n):一次获取 n 条记录。
fetchall():一次性返回全部结果,适合小数据集。
1. 普通游标逐行读取
import pymysql
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='guest',
password='Guest.618',
database='hrs',
charset='utf8mb4'
)
try:
with conn.cursor() as cursor:
cursor.execute('select `dno`, `dname`, `dloc` from `tb_dept`')
print('📋 部门列表:')
row = cursor.fetchone()
while row:
print(f' - 编号:{row[0]} | 名称:{row[1]} | 地点:{row[2]}')
row = cursor.fetchone()
except pymysql.MySQLError as err:
print(f'⚠️ 查询失败:{err}')
finally:
conn.close()
2. 字典游标分页查询
默认游标返回的是 元组,字段顺序依赖 SQL 语句,可读性不高。使用 pymysql.cursors.DictCursor 可以让结果以 字典 形式返回,键名就是字段名,更加清晰直观。
import pymysql
page = int(input('请输入页码: '))
size = int(input('请输入每页大小: '))
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='guest',
password='Guest.618',
database='hrs',
charset='utf8mb4'
)
try:
# 指定游标类型为 DictCursor
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(
'select `eno`, `ename`, `job`, `sal` from `tb_emp` order by `sal` desc limit %s, %s',
((page - 1) * size, size)
)
emp_list = cursor.fetchall()
if emp_list:
print(f'📋 第{page}页员工列表(按薪资降序):')
for emp in emp_list:
print(
f' - 工号:{emp["eno"]} | '
f'姓名:{emp["ename"]} | '
f'职位:{emp["job"]} | '
f'月薪:{emp["sal"]}'
)
else:
print('⚠️ 该页无数据!')
finally:
conn.close()
第三步:实战案例 — 导出数据到 Excel
日常工作中常需要将数据库表导出为 Excel 报表,这里使用纯 Python 的 openpyxl 库来实现。
安装 openpyxl
导出代码
目标:将 tb_emp 和 tb_dept 自然连接后,把员工的工号、姓名、职位、月薪、补贴、部门信息导出为 员工表.xlsx。
import openpyxl
import pymysql
# 1. 创建工作簿并修改工作表标题
wb = openpyxl.Workbook()
ws = wb.active
ws.title = '员工基本信息'
# 2. 写入表头
ws.append(('工号', '姓名', '职位', '月薪', '补贴', '部门'))
# 3. 连接数据库查询数据
conn = pymysql.connect(
host='127.0.0.1',
port=3306,
user='guest',
password='Guest.618',
database='hrs',
charset='utf8mb4'
)
try:
with conn.cursor() as cursor:
# coalesce(comm, 0):将 NULL 补贴替换为 0
cursor.execute(
'select `eno`, `ename`, `job`, `sal`, coalesce(`comm`, 0), `dname` '
'from `tb_emp` natural join `tb_dept`'
)
# 4. 逐行写入 Excel
row = cursor.fetchone()
while row:
ws.append(row)
row = cursor.fetchone()
# 5. 保存文件
wb.save('员工表.xlsx')
print('🎉 Excel 导出成功!文件名为「员工表.xlsx」')
except pymysql.MySQLError as err:
print(f'⚠️ 导出失败:{err}')
finally:
conn.close()
课后练习
尝试反过来操作:将 Excel 文件中的数据批量导入到 MySQL 的指定表中。
(提示:使用 openpyxl 的 iter_rows 读取 Excel 内容,结合 cursor.executemany() 实现高效批量插入。)