#Python接入MySQL与PostgreSQL数据库完整指南
#引言
Python作为一门强大的后端开发语言,与关系型数据库的集成是现代Web应用开发的基础。本文将详细介绍如何使用Python连接和操作MySQL与PostgreSQL数据库,涵盖从基础连接到高级特性的各个方面,包括连接配置、CRUD操作、事务处理、连接池管理、性能优化等。
#1. 环境准备与依赖安装
#1.1 安装数据库驱动
# MySQL数据库驱动
pip install PyMySQL # 纯Python实现
pip install mysqlclient # C扩展,性能更好
pip install mysql-connector-python # Oracle官方驱动
# PostgreSQL数据库驱动
pip install psycopg2-binary # 预编译版本,推荐
pip install psycopg2 # 源码编译版本
# ORM框架
pip install SQLAlchemy # 通用ORM
pip install Django # Web框架自带ORM
# 连接池
pip install DBUtils # 通用连接池
pip install SQLAlchemy-Pool # SQLAlchemy连接池#1.2 数据库连接配置
# config.py - 数据库配置
DATABASE_CONFIG = {
'mysql': {
'host': 'localhost',
'port': 3306,
'user': 'your_username',
'password': 'your_password',
'database': 'your_database',
'charset': 'utf8mb4',
'autocommit': False,
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30
},
'postgresql': {
'host': 'localhost',
'port': 5432,
'user': 'your_username',
'password': 'your_password',
'database': 'your_database',
'connect_timeout': 10,
'application_name': 'PythonApp'
}
}#2. MySQL数据库连接与操作
#2.1 基础连接操作
import pymysql
from contextlib import contextmanager
import logging
class MySQLDatabase:
def __init__(self, config):
self.config = config
def get_connection(self):
"""获取数据库连接"""
try:
connection = pymysql.connect(
host=self.config['host'],
port=self.config['port'],
user=self.config['user'],
password=self.config['password'],
database=self.config['database'],
charset=self.config['charset'],
autocommit=self.config['autocommit'],
connect_timeout=self.config['connect_timeout'],
read_timeout=self.config['read_timeout'],
write_timeout=self.config['write_timeout']
)
return connection
except Exception as e:
logging.error(f"MySQL连接失败: {e}")
raise
@contextmanager
def get_db_connection(self):
"""数据库连接上下文管理器"""
conn = None
try:
conn = self.get_connection()
yield conn
except Exception as e:
if conn:
conn.rollback()
logging.error(f"数据库操作失败: {e}")
raise
finally:
if conn:
conn.close()
# 使用示例
mysql_config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password',
'database': 'test_db',
'charset': 'utf8mb4',
'autocommit': False,
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30
}
db = MySQLDatabase(mysql_config)#2.2 CRUD操作实现
class MySQLCRUD:
def __init__(self, db_instance):
self.db = db_instance
def create_record(self, table, data):
"""创建记录"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, list(data.values()))
conn.commit()
return cursor.lastrowid
except Exception as e:
conn.rollback()
logging.error(f"插入数据失败: {e}")
raise
def read_records(self, table, conditions=None, limit=None, offset=None):
"""读取记录"""
sql = f"SELECT * FROM {table}"
params = []
if conditions:
where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
sql += f" WHERE {where_clause}"
params.extend(list(conditions.values()))
if limit:
sql += f" LIMIT {limit}"
if offset:
sql += f" OFFSET {offset}"
with self.db.get_db_connection() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql, params)
return cursor.fetchall()
def update_record(self, table, data, conditions):
"""更新记录"""
set_clause = ", ".join([f"{k} = %s" for k in data.keys()])
where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
params = list(data.values()) + list(conditions.values())
cursor.execute(sql, params)
affected_rows = cursor.rowcount
conn.commit()
return affected_rows
except Exception as e:
conn.rollback()
logging.error(f"更新数据失败: {e}")
raise
def delete_record(self, table, conditions):
"""删除记录"""
where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
sql = f"DELETE FROM {table} WHERE {where_clause}"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, list(conditions.values()))
affected_rows = cursor.rowcount
conn.commit()
return affected_rows
except Exception as e:
conn.rollback()
logging.error(f"删除数据失败: {e}")
raise
# 使用示例
crud = MySQLCRUD(db)
# 创建记录
record_id = crud.create_record('users', {
'name': 'John Doe',
'email': 'john@example.com',
'age': 30
})
# 读取记录
users = crud.read_records('users', {'name': 'John Doe'})
# 更新记录
affected = crud.update_record('users', {'age': 31}, {'id': record_id})
# 删除记录
deleted = crud.delete_record('users', {'id': record_id})#2.3 高级MySQL特性
class MySQLAdvancedFeatures:
def __init__(self, db_instance):
self.db = db_instance
def execute_batch_insert(self, table, columns, data_list):
"""批量插入"""
placeholders = ', '.join(['%s'] * len(columns))
sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.executemany(sql, data_list)
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logging.error(f"批量插入失败: {e}")
raise
def execute_transaction(self, operations):
"""执行事务"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
conn.begin()
results = []
for op in operations:
sql, params = op
cursor.execute(sql, params)
results.append(cursor.rowcount)
conn.commit()
return results
except Exception as e:
conn.rollback()
logging.error(f"事务执行失败: {e}")
raise
def get_with_lock(self, table, conditions, lock_type='FOR UPDATE'):
"""带锁查询"""
where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
sql = f"SELECT * FROM {table} WHERE {where_clause} {lock_type}"
with self.db.get_db_connection() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql, list(conditions.values()))
return cursor.fetchone()
def execute_stored_procedure(self, procedure_name, args):
"""执行存储过程"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
result = cursor.callproc(procedure_name, args)
conn.commit()
return result
except Exception as e:
conn.rollback()
logging.error(f"存储过程执行失败: {e}")
raise
# 批量插入示例
advanced = MySQLAdvancedFeatures(db)
batch_data = [
('Alice', 'alice@example.com', 25),
('Bob', 'bob@example.com', 35),
('Charlie', 'charlie@example.com', 28)
]
batch_result = advanced.execute_batch_insert('users', ['name', 'email', 'age'], batch_data)#3. PostgreSQL数据库连接与操作
#3.1 基础连接操作
import psycopg2
from psycopg2.extras import RealDictCursor, DictCursor
from contextlib import contextmanager
import logging
class PostgreSQLDatabase:
def __init__(self, config):
self.config = config
def get_connection(self):
"""获取PostgreSQL数据库连接"""
try:
connection = psycopg2.connect(
host=self.config['host'],
port=self.config['port'],
user=self.config['user'],
password=self.config['password'],
database=self.config['database'],
connect_timeout=self.config['connect_timeout'],
application_name=self.config['application_name']
)
return connection
except Exception as e:
logging.error(f"PostgreSQL连接失败: {e}")
raise
@contextmanager
def get_db_connection(self):
"""PostgreSQL连接上下文管理器"""
conn = None
try:
conn = self.get_connection()
yield conn
except Exception as e:
if conn:
conn.rollback()
logging.error(f"数据库操作失败: {e}")
raise
finally:
if conn:
conn.close()
# PostgreSQL CRUD操作
class PostgreSQLCRUD:
def __init__(self, db_instance):
self.db = db_instance
def create_record(self, table, data):
"""创建记录"""
columns = ', '.join(data.keys())
placeholders = ', '.join([f'%({key})s' for key in data.keys()])
sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders}) RETURNING id"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, data)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
except Exception as e:
conn.rollback()
logging.error(f"插入数据失败: {e}")
raise
def read_records(self, table, conditions=None, limit=None, offset=None):
"""读取记录"""
sql = f"SELECT * FROM {table}"
params = {}
if conditions:
where_clause = " AND ".join([f"{k} = %({k})s" for k in conditions.keys()])
sql += f" WHERE {where_clause}"
params.update(conditions)
if limit:
sql += f" LIMIT %(limit)s"
params['limit'] = limit
if offset:
sql += f" OFFSET %(offset)s"
params['offset'] = offset
with self.db.get_db_connection() as conn:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(sql, params)
return [dict(row) for row in cursor.fetchall()]
def update_record(self, table, data, conditions):
"""更新记录"""
set_clause = ", ".join([f"{k} = %({k})s" for k in data.keys()])
where_clause = " AND ".join([f"{k} = %({k})s" for k in conditions.keys()])
sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
params = {**data, **conditions}
cursor.execute(sql, params)
affected_rows = cursor.rowcount
conn.commit()
return affected_rows
except Exception as e:
conn.rollback()
logging.error(f"更新数据失败: {e}")
raise
def delete_record(self, table, conditions):
"""删除记录"""
where_clause = " AND ".join([f"{k} = %({k})s" for k in conditions.keys()])
sql = f"DELETE FROM {table} WHERE {where_clause}"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, conditions)
affected_rows = cursor.rowcount
conn.commit()
return affected_rows
except Exception as e:
conn.rollback()
logging.error(f"删除数据失败: {e}")
raise
# PostgreSQL高级特性
class PostgreSQLAdvancedFeatures:
def __init__(self, db_instance):
self.db = db_instance
def execute_batch_insert(self, table, columns, data_list):
"""批量插入 - 使用executemany"""
from psycopg2.extras import execute_batch, execute_values
placeholders = ', '.join([f'%({col})s' for col in columns])
sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
execute_batch(cursor, sql, data_list)
conn.commit()
return cursor.rowcount
except Exception as e:
conn.rollback()
logging.error(f"批量插入失败: {e}")
raise
def execute_upsert(self, table, data, conflict_columns, update_columns):
"""UPSERT操作 (INSERT ... ON CONFLICT)"""
columns = ', '.join(data.keys())
placeholders = ', '.join([f'%({key})s' for key in data.keys()])
conflict_clause = ', '.join(conflict_columns)
update_clause = ', '.join([f"{col} = EXCLUDED.{col}" for col in update_columns])
sql = f"""
INSERT INTO {table} ({columns}) VALUES ({placeholders})
ON CONFLICT ({conflict_clause})
DO UPDATE SET {update_clause}
RETURNING id
"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, data)
new_id = cursor.fetchone()[0]
conn.commit()
return new_id
except Exception as e:
conn.rollback()
logging.error(f"UPSERT操作失败: {e}")
raise
def execute_json_operations(self, table, json_conditions):
"""JSON操作"""
# PostgreSQL JSON查询示例
sql = f"""
SELECT * FROM {table}
WHERE data->>%s = %s
"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(sql, [json_conditions['key'], json_conditions['value']])
return [dict(row) for row in cursor.fetchall()]
def execute_cte_query(self, cte_query):
"""执行CTE查询"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(cte_query)
return [dict(row) for row in cursor.fetchall()]
# 使用示例
pg_config = {
'host': 'localhost',
'port': 5432,
'user': 'postgres',
'password': 'password',
'database': 'test_db',
'connect_timeout': 10,
'application_name': 'PythonApp'
}
pg_db = PostgreSQLDatabase(pg_config)
pg_crud = PostgreSQLCRUD(pg_db)
pg_advanced = PostgreSQLAdvancedFeatures(pg_db)#3.2 PostgreSQL特有功能
class PostgreSQLSpecificFeatures:
def __init__(self, db_instance):
self.db = db_instance
def handle_arrays(self, table, array_data):
"""处理数组类型"""
sql = """
INSERT INTO products (name, tags)
VALUES (%(name)s, %(tags)s)
"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, array_data)
conn.commit()
def handle_json(self, table, json_data):
"""处理JSON/JSONB类型"""
sql = """
INSERT INTO user_profiles (user_id, profile_data)
VALUES (%(user_id)s, %(profile_data)s)
"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, json_data)
conn.commit()
def execute_window_functions(self, query):
"""执行窗口函数查询"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(query)
return [dict(row) for row in cursor.fetchall()]
def execute_full_text_search(self, table, search_term):
"""全文搜索"""
sql = f"""
SELECT *, ts_rank(search_vector, plainto_tsquery(%s)) as rank
FROM {table}
WHERE search_vector @@ plainto_tsquery(%s)
ORDER BY rank DESC
"""
with self.db.get_db_connection() as conn:
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(sql, [search_term, search_term])
return [dict(row) for row in cursor.fetchall()]#4. 连接池管理
#4.1 MySQL连接池
from DBUtils.PooledDB import PooledDB
import pymysql
class MySQLConnectionPool:
def __init__(self, config, pool_size=10):
self.pool = PooledDB(
creator=pymysql,
maxconnections=pool_size,
mincached=2,
maxcached=5,
maxshared=3,
blocking=True,
maxusage=None,
setsession=[],
ping=0,
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
charset=config['charset']
)
def get_connection(self):
"""获取连接池中的连接"""
return self.pool.connection()
@contextmanager
def get_db_connection(self):
"""连接池上下文管理器"""
conn = None
try:
conn = self.get_connection()
yield conn
except Exception as e:
if conn:
conn.rollback()
logging.error(f"数据库操作失败: {e}")
raise
finally:
if conn:
conn.close()
# 使用连接池
mysql_pool_config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password',
'database': 'test_db',
'charset': 'utf8mb4'
}
mysql_pool = MySQLConnectionPool(mysql_pool_config, pool_size=20)#4.2 PostgreSQL连接池
from psycopg2 import pool
import threading
class PostgreSQLConnectionPool:
def __init__(self, config, minconn=1, maxconn=20):
self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
minconn=minconn,
maxconn=maxconn,
host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
database=config['database'],
connect_timeout=config['connect_timeout']
)
self.lock = threading.Lock()
def get_connection(self):
"""获取连接池中的连接"""
return self.connection_pool.getconn()
def put_connection(self, conn):
"""归还连接到连接池"""
self.connection_pool.putconn(conn)
@contextmanager
def get_db_connection(self):
"""连接池上下文管理器"""
conn = None
try:
conn = self.get_connection()
yield conn
except Exception as e:
if conn:
conn.rollback()
logging.error(f"数据库操作失败: {e}")
raise
finally:
if conn:
self.put_connection(conn)
# 使用PostgreSQL连接池
pg_pool_config = {
'host': 'localhost',
'port': 5432,
'user': 'postgres',
'password': 'password',
'database': 'test_db',
'connect_timeout': 10
}
pg_pool = PostgreSQLConnectionPool(pg_pool_config, minconn=2, maxconn=20)#5. ORM框架使用
#5.1 SQLAlchemy基础使用
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(100), nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class DatabaseManager:
def __init__(self, database_url):
self.engine = create_engine(
database_url,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
self.SessionLocal = sessionmaker(bind=self.engine)
def create_tables(self):
"""创建表"""
Base.metadata.create_all(bind=self.engine)
@contextmanager
def get_db_session(self):
"""数据库会话上下文管理器"""
session = self.SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logging.error(f"数据库操作失败: {e}")
raise
finally:
session.close()
# MySQL SQLAlchemy配置
mysql_db_url = "mysql+pymysql://username:password@localhost:3306/database_name?charset=utf8mb4"
# PostgreSQL SQLAlchemy配置
pg_db_url = "postgresql+psycopg2://username:password@localhost:5432/database_name"
# 创建数据库管理器
db_manager = DatabaseManager(mysql_db_url)
db_manager.create_tables()
# SQLAlchemy CRUD操作
class SQLAlchemyCRUD:
def __init__(self, db_manager):
self.db_manager = db_manager
def create_user(self, name, email, age):
"""创建用户"""
with self.db_manager.get_db_session() as session:
user = User(name=name, email=email, age=age)
session.add(user)
session.flush() # 获取ID但不提交
return user.id
def get_user(self, user_id):
"""获取用户"""
with self.db_manager.get_db_session() as session:
return session.query(User).filter(User.id == user_id).first()
def get_users(self, skip=0, limit=10):
"""获取用户列表"""
with self.db_manager.get_db_session() as session:
return session.query(User).offset(skip).limit(limit).all()
def update_user(self, user_id, **kwargs):
"""更新用户"""
with self.db_manager.get_db_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if user:
for key, value in kwargs.items():
setattr(user, key, value)
return True
return False
def delete_user(self, user_id):
"""删除用户"""
with self.db_manager.get_db_session() as session:
user = session.query(User).filter(User.id == user_id).first()
if user:
session.delete(user)
return True
return False
# 使用SQLAlchemy CRUD
sqlalchemy_crud = SQLAlchemyCRUD(db_manager)
user_id = sqlalchemy_crud.create_user("John Doe", "john@example.com", 30)
user = sqlalchemy_crud.get_user(user_id)#5.2 SQLAlchemy高级特性
from sqlalchemy import func, and_, or_, desc, asc
from sqlalchemy.orm import joinedload, selectinload
class SQLAlchemyAdvanced:
def __init__(self, db_manager):
self.db_manager = db_manager
def complex_query(self):
"""复杂查询示例"""
with self.db_manager.get_db_session() as session:
# 聚合查询
result = session.query(
func.count(User.id).label('total_users'),
func.avg(User.age).label('avg_age'),
func.max(User.created_at).label('latest_user')
).first()
# 条件查询
young_users = session.query(User).filter(
and_(
User.age < 30,
User.created_at > datetime(2023, 1, 1)
)
).all()
# 排序查询
ordered_users = session.query(User).order_by(
desc(User.age), asc(User.name)
).limit(10).all()
return result, young_users, ordered_users
def bulk_operations(self, users_data):
"""批量操作"""
with self.db_manager.get_db_session() as session:
# 批量插入
users = [User(**data) for data in users_data]
session.bulk_save_objects(users)
# 批量更新
session.query(User).filter(User.age < 18).update({
User.age: 18
})
# 批量删除
session.query(User).filter(User.age > 100).delete()
def relationship_query(self):
"""关联查询(如果有关系的话)"""
# 示例:如果有Order表关联到User
# session.query(User).options(joinedload(User.orders)).all()
pass#6. 性能优化技巧
#6.1 查询优化
class PerformanceOptimization:
def __init__(self, db_instance):
self.db = db_instance
def optimized_pagination(self, table, page, page_size, filters=None):
"""优化的分页查询"""
offset = (page - 1) * page_size
# 使用游标分页避免深度分页问题
if filters:
where_clause = " AND ".join([f"{k} = %s" for k in filters.keys()])
sql = f"""
SELECT * FROM {table}
WHERE {where_clause}
ORDER BY id
LIMIT %s OFFSET %s
"""
params = list(filters.values()) + [page_size, offset]
else:
sql = f"SELECT * FROM {table} ORDER BY id LIMIT %s OFFSET %s"
params = [page_size, offset]
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params)
return cursor.fetchall()
def prepared_statement_example(self):
"""预编译语句示例"""
# 在连接级别启用预编译语句
pass
def connection_keep_alive(self):
"""连接保持活跃"""
# 在配置中设置
# keepalive相关参数
pass
# 索引优化查询
def create_optimized_indexes():
"""创建优化的索引"""
mysql_indexes = [
"CREATE INDEX idx_users_email ON users(email)",
"CREATE INDEX idx_users_age_created ON users(age, created_at)",
"CREATE INDEX idx_orders_user_status ON orders(user_id, status)"
]
pg_indexes = [
"CREATE INDEX idx_users_email ON users(email)",
"CREATE INDEX idx_users_age_created ON users(age, created_at)",
"CREATE INDEX CONCURRENTLY idx_large_table_col ON large_table(column_name)" # 并发创建
]
return mysql_indexes, pg_indexes#6.2 连接和资源管理
import asyncio
import aiomysql
import asyncpg
from contextlib import asynccontextmanager
class AsyncDatabaseManager:
"""异步数据库管理器"""
def __init__(self, db_type, config):
self.db_type = db_type
self.config = config
self.pool = None
async def initialize_pool(self):
"""初始化连接池"""
if self.db_type == 'mysql':
self.pool = await aiomysql.create_pool(**self.config)
elif self.db_type == 'postgresql':
self.pool = await asyncpg.create_pool(**self.config)
@asynccontextmanager
async def get_connection(self):
"""异步获取连接"""
async with self.pool.acquire() as conn:
yield conn
async def close_pool(self):
"""关闭连接池"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
# 使用连接池的最佳实践
class BestPractices:
def __init__(self):
self.timeout_settings = {
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30,
'command_timeout': 60
}
def connection_configuration(self):
"""连接配置最佳实践"""
config = {
# 连接池设置
'pool_size': 10,
'max_overflow': 20,
'pool_timeout': 30,
'pool_recycle': 3600, # 1小时回收连接
'pool_pre_ping': True, # 连接前验证
# 超时设置
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30,
# 其他设置
'charset': 'utf8mb4',
'autocommit': False,
'sql_mode': 'STRICT_TRANS_TABLES'
}
return config
def error_handling_strategy(self):
"""错误处理策略"""
strategies = {
'retry_logic': True,
'circuit_breaker': True,
'fallback_methods': True,
'logging': True
}
return strategies#7. 安全最佳实践
#7.1 SQL注入防护
class SecurityBestPractices:
def __init__(self, db_instance):
self.db = db_instance
def safe_query_with_validation(self, table, conditions):
"""带验证的安全查询"""
# 验证表名(白名单)
allowed_tables = ['users', 'orders', 'products']
if table not in allowed_tables:
raise ValueError(f"不允许的表名: {table}")
# 验证列名
allowed_columns = {
'users': ['id', 'name', 'email', 'age'],
'orders': ['id', 'user_id', 'status', 'amount'],
'products': ['id', 'name', 'price', 'category']
}
for col in conditions.keys():
if col not in allowed_columns.get(table, []):
raise ValueError(f"不允许的列名: {col}")
# 使用参数化查询
where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
sql = f"SELECT * FROM {table} WHERE {where_clause}"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, list(conditions.values()))
return cursor.fetchall()
def input_sanitization(self, user_input):
"""输入净化"""
import re
# 移除SQL关键字
sanitized = re.sub(r'\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b', '',
user_input, flags=re.IGNORECASE)
# 移除注释
sanitized = re.sub(r'/\*.*?\*/', '', sanitized)
sanitized = re.sub(r'--.*', '', sanitized)
return sanitized.strip()
def use_parameterized_queries(self, table, data):
"""使用参数化查询"""
# 好的做法 - 使用参数化查询
columns = ', '.join(data.keys())
placeholders = ', '.join(['%s'] * len(data))
sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
with self.db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, list(data.values()))
return cursor.rowcount
# 不好的做法 - 字符串拼接(易受SQL注入攻击)
# sql = f"INSERT INTO {table} ({columns}) VALUES ({values_str})"#7.2 连接安全
def secure_connection_config():
"""安全连接配置"""
secure_mysql_config = {
'host': 'localhost',
'port': 3306,
'user': 'app_user',
'password': 'secure_password', # 从环境变量获取
'database': 'app_db',
'charset': 'utf8mb4',
'ssl': {
'ssl_ca': '/path/to/ca.pem',
'ssl_cert': '/path/to/client-cert.pem',
'ssl_key': '/path/to/client-key.pem'
},
'autocommit': False,
'connect_timeout': 10
}
secure_pg_config = {
'host': 'localhost',
'port': 5432,
'user': 'app_user',
'password': 'secure_password', # 从环境变量获取
'database': 'app_db',
'sslmode': 'require', # 或 'verify-full'
'connect_timeout': 10,
'application_name': 'SecureApp'
}
return secure_mysql_config, secure_pg_config
# 环境变量配置
import os
from dotenv import load_dotenv
load_dotenv()
DB_CONFIG = {
'mysql': {
'host': os.getenv('MYSQL_HOST', 'localhost'),
'port': int(os.getenv('MYSQL_PORT', 3306)),
'user': os.getenv('MYSQL_USER'),
'password': os.getenv('MYSQL_PASSWORD'),
'database': os.getenv('MYSQL_DATABASE'),
'charset': 'utf8mb4'
},
'postgresql': {
'host': os.getenv('PG_HOST', 'localhost'),
'port': int(os.getenv('PG_PORT', 5432)),
'user': os.getenv('PG_USER'),
'password': os.getenv('PG_PASSWORD'),
'database': os.getenv('PG_DATABASE')
}
}#8. 监控和调试
#8.1 性能监控
import time
import functools
from typing import Callable, Any
def monitor_db_performance(func: Callable) -> Callable:
"""数据库性能监控装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
start_memory = 0 # 可以集成内存监控
try:
result = func(*args, **kwargs)
success = True
except Exception as e:
success = False
raise
finally:
end_time = time.time()
duration = end_time - start_time
# 记录性能指标
logging.info(f"DB Operation: {func.__name__}, "
f"Duration: {duration:.4f}s, "
f"Success: {success}")
# 如果耗时过长,记录警告
if duration > 1.0: # 超过1秒
logging.warning(f"Slow query detected: {func.__name__}, "
f"Duration: {duration:.4f}s")
return result
return wrapper
class DatabaseMonitoring:
def __init__(self):
self.query_count = 0
self.slow_queries = []
self.error_count = 0
@monitor_db_performance
def monitored_query(self, connection, query, params=None):
"""带监控的查询"""
cursor = connection.cursor()
try:
cursor.execute(query, params)
result = cursor.fetchall()
self.query_count += 1
return result
except Exception as e:
self.error_count += 1
raise
finally:
cursor.close()
# 查询分析
def explain_query_performance(connection, query, params=None):
"""分析查询性能"""
cursor = connection.cursor()
# MySQL
explain_sql = f"EXPLAIN {query}"
cursor.execute(explain_sql, params)
explain_result = cursor.fetchall()
print("Query Execution Plan:")
for row in explain_result:
print(row)
cursor.close()
return explain_result#8.2 日志记录
import logging
import json
from datetime import datetime
class DatabaseLogger:
def __init__(self, logger_name='db_logger'):
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
# 创建处理器
handler = logging.FileHandler('database_operations.log')
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_query(self, query, params, duration, connection_info):
"""记录查询日志"""
log_data = {
'timestamp': datetime.now().isoformat(),
'query': query,
'params': params,
'duration': duration,
'connection_info': connection_info,
'query_hash': hash(query) # 用于识别相似查询
}
self.logger.info(json.dumps(log_data))
def log_slow_query(self, query, params, duration, threshold=1.0):
"""记录慢查询"""
if duration > threshold:
self.logger.warning(f"SLOW QUERY (>{threshold}s): {query[:100]}...")
def log_error(self, query, params, error):
"""记录错误"""
error_data = {
'timestamp': datetime.now().isoformat(),
'query': query,
'params': params,
'error': str(error),
'error_type': type(error).__name__
}
self.logger.error(json.dumps(error_data))
# 使用示例
db_logger = DatabaseLogger()
def execute_with_logging(connection, query, params=None):
"""带日志的执行函数"""
start_time = time.time()
try:
cursor = connection.cursor()
cursor.execute(query, params)
result = cursor.fetchall()
duration = time.time() - start_time
db_logger.log_query(query, params, duration,
{'host': connection.host, 'database': connection.db})
db_logger.log_slow_query(query, params, duration)
return result
except Exception as e:
duration = time.time() - start_time
db_logger.log_error(query, params, e)
raise#9. 实际应用示例
#9.1 Web应用集成
from flask import Flask, request, jsonify
from contextlib import contextmanager
app = Flask(__name__)
class WebAppDatabase:
def __init__(self, db_manager):
self.db_manager = db_manager
@contextmanager
def get_db_session(self):
"""Web应用数据库会话管理"""
session = self.db_manager.SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Flask路由示例
@app.route('/users', methods=['GET'])
def get_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
with app.db_manager.get_db_session() as session:
users = session.query(User)\
.offset((page - 1) * per_page)\
.limit(per_page)\
.all()
return jsonify({
'users': [{'id': u.id, 'name': u.name, 'email': u.email} for u in users],
'page': page,
'per_page': per_page
})
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
with app.db_manager.get_db_session() as session:
user = User(name=data['name'], email=data['email'], age=data['age'])
session.add(user)
session.commit()
return jsonify({'id': user.id}), 201
# 错误处理
@app.errorhandler(Exception)
def handle_exception(e):
app.db_manager.SessionLocal().rollback()
return jsonify({'error': str(e)}), 500#9.2 数据迁移脚本
import csv
import json
from typing import List, Dict, Any
class DataMigration:
def __init__(self, source_db, target_db):
self.source_db = source_db
self.target_db = target_db
def migrate_table(self, table_name: str, batch_size: int = 1000):
"""迁移表数据"""
# 从源数据库读取数据
with self.source_db.get_db_connection() as source_conn:
source_cursor = source_conn.cursor()
source_cursor.execute(f"SELECT * FROM {table_name}")
# 分批处理
while True:
rows = source_cursor.fetchmany(batch_size)
if not rows:
break
# 转换为字典列表
columns = [desc[0] for desc in source_cursor.description]
data_list = [dict(zip(columns, row)) for row in rows]
# 插入到目标数据库
self.insert_batch(target_table=table_name, data_list=data_list)
def export_to_csv(self, table_name: str, filename: str):
"""导出表到CSV"""
with self.source_db.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM {table_name}")
columns = [desc[0] for desc in cursor.description]
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(columns) # 写入表头
for row in cursor:
writer.writerow(row)
def import_from_csv(self, table_name: str, filename: str):
"""从CSV导入数据"""
with open(filename, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
data_list = [row for row in reader]
# 批量插入
self.insert_batch(table_name, data_list)
def insert_batch(self, target_table: str, data_list: List[Dict[str, Any]]):
"""批量插入数据"""
if not data_list:
return
with self.target_db.get_db_connection() as conn:
cursor = conn.cursor()
# 构建SQL语句
columns = list(data_list[0].keys())
placeholders = ', '.join(['%s'] * len(columns))
sql = f"INSERT INTO {target_table} ({', '.join(columns)}) VALUES ({placeholders})"
# 准备数据
values_list = [tuple(row[col] for col in columns) for row in data_list]
cursor.executemany(sql, values_list)
conn.commit()
# 使用示例
migration = DataMigration(source_db=mysql_db, target_db=pg_db)
migration.migrate_table('users', batch_size=500)#10. 常见问题与解决方案
#10.1 连接问题
class ConnectionTroubleshooting:
def __init__(self):
self.checklist = [
"检查数据库服务是否运行",
"验证连接参数(主机、端口、用户名、密码)",
"检查防火墙设置",
"验证网络连通性",
"检查数据库用户权限",
"查看数据库日志"
]
def test_connection(self, config):
"""测试数据库连接"""
try:
if config.get('database_type') == 'mysql':
import pymysql
conn = pymysql.connect(**{k: v for k, v in config.items()
if k != 'database_type'})
else:
import psycopg2
conn = psycopg2.connect(**{k: v for k, v in config.items()
if k != 'database_type'})
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
conn.close()
return result is not None
except Exception as e:
logging.error(f"连接测试失败: {e}")
return False
def connection_retry_mechanism(self, connect_func, max_retries=3, delay=1):
"""连接重试机制"""
import time
for attempt in range(max_retries):
try:
return connect_func()
except Exception as e:
if attempt == max_retries - 1:
raise e
logging.warning(f"连接失败,{delay}秒后重试... (尝试 {attempt + 1}/{max_retries})")
time.sleep(delay)
delay *= 2 # 指数退避#10.2 性能问题
class PerformanceTroubleshooting:
def __init__(self):
pass
def identify_slow_queries(self, db_instance):
"""识别慢查询"""
# MySQL
mysql_slow_query = """
SELECT * FROM information_schema.processlist
WHERE TIME > 60 AND COMMAND != 'Sleep'
"""
# PostgreSQL
pg_slow_query = """
SELECT pid, now() - pg_stat_activity.query_start AS duration,
query, state
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '60 seconds'
AND state = 'active'
"""
return mysql_slow_query, pg_slow_query
def optimize_common_issues(self):
"""优化常见问题"""
optimizations = {
"missing_indexes": "CREATE INDEX ON table(column)",
"inefficient_joins": "Add proper indexes on join columns",
"full_table_scans": "Add WHERE clause indexes",
"large_result_sets": "Implement pagination",
"n_plus_one_queries": "Use JOINs or eager loading"
}
return optimizations#相关教程
#总结
Python接入MySQL和PostgreSQL数据库是现代Web开发的重要技能:
- MySQL:适合Web应用,连接简单,社区支持广泛
- PostgreSQL:功能丰富,支持高级数据类型,适合复杂应用
无论选择哪种数据库,都应该:
- 使用连接池:提高性能和资源利用率
- 参数化查询:防止SQL注入攻击
- 事务管理:确保数据一致性
- 错误处理:优雅处理异常情况
- 性能监控:持续优化查询性能
- 安全配置:保护数据库连接安全
通过合理的设计和实现,可以构建高效、安全、可维护的数据库应用系统。

