Python接入MySQL与PostgreSQL数据库完整指南

引言

Python作为一门强大的后端开发语言,与关系型数据库的集成是现代Web应用开发的基础。本文将详细介绍如何使用Python连接和操作MySQL与PostgreSQL数据库,涵盖从基础连接到高级特性的各个方面,包括连接配置、CRUD操作、事务处理、连接池管理、性能优化等。

1. 环境准备与依赖安装

1.1 安装数据库驱动

# MySQL数据库驱动
pip install PyMySQL  # 纯Python实现
pip install mysqlclient  # C扩展,性能更好
pip install mysql-connector-python  # Oracle官方驱动

# PostgreSQL数据库驱动
pip install psycopg2-binary  # 预编译版本,推荐
pip install psycopg2  # 源码编译版本

# ORM框架
pip install SQLAlchemy  # 通用ORM
pip install Django  # Web框架自带ORM

# 连接池
pip install DBUtils  # 通用连接池
pip install SQLAlchemy-Pool  # SQLAlchemy连接池

1.2 数据库连接配置

# config.py - 数据库配置
DATABASE_CONFIG = {
    'mysql': {
        'host': 'localhost',
        'port': 3306,
        'user': 'your_username',
        'password': 'your_password',
        'database': 'your_database',
        'charset': 'utf8mb4',
        'autocommit': False,
        'connect_timeout': 10,
        'read_timeout': 30,
        'write_timeout': 30
    },
    'postgresql': {
        'host': 'localhost',
        'port': 5432,
        'user': 'your_username',
        'password': 'your_password',
        'database': 'your_database',
        'connect_timeout': 10,
        'application_name': 'PythonApp'
    }
}

2. MySQL数据库连接与操作

2.1 基础连接操作

import pymysql
from contextlib import contextmanager
import logging

class MySQLDatabase:
    def __init__(self, config):
        self.config = config
        
    def get_connection(self):
        """获取数据库连接"""
        try:
            connection = pymysql.connect(
                host=self.config['host'],
                port=self.config['port'],
                user=self.config['user'],
                password=self.config['password'],
                database=self.config['database'],
                charset=self.config['charset'],
                autocommit=self.config['autocommit'],
                connect_timeout=self.config['connect_timeout'],
                read_timeout=self.config['read_timeout'],
                write_timeout=self.config['write_timeout']
            )
            return connection
        except Exception as e:
            logging.error(f"MySQL连接失败: {e}")
            raise
    
    @contextmanager
    def get_db_connection(self):
        """数据库连接上下文管理器"""
        conn = None
        try:
            conn = self.get_connection()
            yield conn
        except Exception as e:
            if conn:
                conn.rollback()
            logging.error(f"数据库操作失败: {e}")
            raise
        finally:
            if conn:
                conn.close()

# 使用示例
mysql_config = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'password',
    'database': 'test_db',
    'charset': 'utf8mb4',
    'autocommit': False,
    'connect_timeout': 10,
    'read_timeout': 30,
    'write_timeout': 30
}

db = MySQLDatabase(mysql_config)

2.2 CRUD操作实现

class MySQLCRUD:
    def __init__(self, db_instance):
        self.db = db_instance
    
    def create_record(self, table, data):
        """创建记录"""
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['%s'] * len(data))
        sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(sql, list(data.values()))
                conn.commit()
                return cursor.lastrowid
            except Exception as e:
                conn.rollback()
                logging.error(f"插入数据失败: {e}")
                raise
    
    def read_records(self, table, conditions=None, limit=None, offset=None):
        """读取记录"""
        sql = f"SELECT * FROM {table}"
        params = []
        
        if conditions:
            where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
            sql += f" WHERE {where_clause}"
            params.extend(list(conditions.values()))
        
        if limit:
            sql += f" LIMIT {limit}"
            if offset:
                sql += f" OFFSET {offset}"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor(pymysql.cursors.DictCursor)
            cursor.execute(sql, params)
            return cursor.fetchall()
    
    def update_record(self, table, data, conditions):
        """更新记录"""
        set_clause = ", ".join([f"{k} = %s" for k in data.keys()])
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                params = list(data.values()) + list(conditions.values())
                cursor.execute(sql, params)
                affected_rows = cursor.rowcount
                conn.commit()
                return affected_rows
            except Exception as e:
                conn.rollback()
                logging.error(f"更新数据失败: {e}")
                raise
    
    def delete_record(self, table, conditions):
        """删除记录"""
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"DELETE FROM {table} WHERE {where_clause}"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(sql, list(conditions.values()))
                affected_rows = cursor.rowcount
                conn.commit()
                return affected_rows
            except Exception as e:
                conn.rollback()
                logging.error(f"删除数据失败: {e}")
                raise

# 使用示例
crud = MySQLCRUD(db)
# 创建记录
record_id = crud.create_record('users', {
    'name': 'John Doe',
    'email': 'john@example.com',
    'age': 30
})

# 读取记录
users = crud.read_records('users', {'name': 'John Doe'})

# 更新记录
affected = crud.update_record('users', {'age': 31}, {'id': record_id})

# 删除记录
deleted = crud.delete_record('users', {'id': record_id})

2.3 高级MySQL特性

class MySQLAdvancedFeatures:
    def __init__(self, db_instance):
        self.db = db_instance
    
    def execute_batch_insert(self, table, columns, data_list):
        """批量插入"""
        placeholders = ', '.join(['%s'] * len(columns))
        sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.executemany(sql, data_list)
                conn.commit()
                return cursor.rowcount
            except Exception as e:
                conn.rollback()
                logging.error(f"批量插入失败: {e}")
                raise
    
    def execute_transaction(self, operations):
        """执行事务"""
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                conn.begin()
                results = []
                
                for op in operations:
                    sql, params = op
                    cursor.execute(sql, params)
                    results.append(cursor.rowcount)
                
                conn.commit()
                return results
            except Exception as e:
                conn.rollback()
                logging.error(f"事务执行失败: {e}")
                raise
    
    def get_with_lock(self, table, conditions, lock_type='FOR UPDATE'):
        """带锁查询"""
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"SELECT * FROM {table} WHERE {where_clause} {lock_type}"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor(pymysql.cursors.DictCursor)
            cursor.execute(sql, list(conditions.values()))
            return cursor.fetchone()
    
    def execute_stored_procedure(self, procedure_name, args):
        """执行存储过程"""
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                result = cursor.callproc(procedure_name, args)
                conn.commit()
                return result
            except Exception as e:
                conn.rollback()
                logging.error(f"存储过程执行失败: {e}")
                raise

# 批量插入示例
advanced = MySQLAdvancedFeatures(db)
batch_data = [
    ('Alice', 'alice@example.com', 25),
    ('Bob', 'bob@example.com', 35),
    ('Charlie', 'charlie@example.com', 28)
]
batch_result = advanced.execute_batch_insert('users', ['name', 'email', 'age'], batch_data)

3. PostgreSQL数据库连接与操作

3.1 基础连接操作

import psycopg2
from psycopg2.extras import RealDictCursor, DictCursor
from contextlib import contextmanager
import logging

class PostgreSQLDatabase:
    def __init__(self, config):
        self.config = config
    
    def get_connection(self):
        """获取PostgreSQL数据库连接"""
        try:
            connection = psycopg2.connect(
                host=self.config['host'],
                port=self.config['port'],
                user=self.config['user'],
                password=self.config['password'],
                database=self.config['database'],
                connect_timeout=self.config['connect_timeout'],
                application_name=self.config['application_name']
            )
            return connection
        except Exception as e:
            logging.error(f"PostgreSQL连接失败: {e}")
            raise
    
    @contextmanager
    def get_db_connection(self):
        """PostgreSQL连接上下文管理器"""
        conn = None
        try:
            conn = self.get_connection()
            yield conn
        except Exception as e:
            if conn:
                conn.rollback()
            logging.error(f"数据库操作失败: {e}")
            raise
        finally:
            if conn:
                conn.close()

# PostgreSQL CRUD操作
class PostgreSQLCRUD:
    def __init__(self, db_instance):
        self.db = db_instance
    
    def create_record(self, table, data):
        """创建记录"""
        columns = ', '.join(data.keys())
        placeholders = ', '.join([f'%({key})s' for key in data.keys()])
        sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders}) RETURNING id"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(sql, data)
                new_id = cursor.fetchone()[0]
                conn.commit()
                return new_id
            except Exception as e:
                conn.rollback()
                logging.error(f"插入数据失败: {e}")
                raise
    
    def read_records(self, table, conditions=None, limit=None, offset=None):
        """读取记录"""
        sql = f"SELECT * FROM {table}"
        params = {}
        
        if conditions:
            where_clause = " AND ".join([f"{k} = %({k})s" for k in conditions.keys()])
            sql += f" WHERE {where_clause}"
            params.update(conditions)
        
        if limit:
            sql += f" LIMIT %(limit)s"
            params['limit'] = limit
            if offset:
                sql += f" OFFSET %(offset)s"
                params['offset'] = offset
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor(cursor_factory=RealDictCursor)
            cursor.execute(sql, params)
            return [dict(row) for row in cursor.fetchall()]
    
    def update_record(self, table, data, conditions):
        """更新记录"""
        set_clause = ", ".join([f"{k} = %({k})s" for k in data.keys()])
        where_clause = " AND ".join([f"{k} = %({k})s" for k in conditions.keys()])
        sql = f"UPDATE {table} SET {set_clause} WHERE {where_clause}"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                params = {**data, **conditions}
                cursor.execute(sql, params)
                affected_rows = cursor.rowcount
                conn.commit()
                return affected_rows
            except Exception as e:
                conn.rollback()
                logging.error(f"更新数据失败: {e}")
                raise
    
    def delete_record(self, table, conditions):
        """删除记录"""
        where_clause = " AND ".join([f"{k} = %({k})s" for k in conditions.keys()])
        sql = f"DELETE FROM {table} WHERE {where_clause}"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(sql, conditions)
                affected_rows = cursor.rowcount
                conn.commit()
                return affected_rows
            except Exception as e:
                conn.rollback()
                logging.error(f"删除数据失败: {e}")
                raise

# PostgreSQL高级特性
class PostgreSQLAdvancedFeatures:
    def __init__(self, db_instance):
        self.db = db_instance
    
    def execute_batch_insert(self, table, columns, data_list):
        """批量插入 - 使用executemany"""
        from psycopg2.extras import execute_batch, execute_values
        
        placeholders = ', '.join([f'%({col})s' for col in columns])
        sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                execute_batch(cursor, sql, data_list)
                conn.commit()
                return cursor.rowcount
            except Exception as e:
                conn.rollback()
                logging.error(f"批量插入失败: {e}")
                raise
    
    def execute_upsert(self, table, data, conflict_columns, update_columns):
        """UPSERT操作 (INSERT ... ON CONFLICT)"""
        columns = ', '.join(data.keys())
        placeholders = ', '.join([f'%({key})s' for key in data.keys()])
        conflict_clause = ', '.join(conflict_columns)
        update_clause = ', '.join([f"{col} = EXCLUDED.{col}" for col in update_columns])
        
        sql = f"""
        INSERT INTO {table} ({columns}) VALUES ({placeholders})
        ON CONFLICT ({conflict_clause}) 
        DO UPDATE SET {update_clause}
        RETURNING id
        """
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(sql, data)
                new_id = cursor.fetchone()[0]
                conn.commit()
                return new_id
            except Exception as e:
                conn.rollback()
                logging.error(f"UPSERT操作失败: {e}")
                raise
    
    def execute_json_operations(self, table, json_conditions):
        """JSON操作"""
        # PostgreSQL JSON查询示例
        sql = f"""
        SELECT * FROM {table} 
        WHERE data->>%s = %s
        """
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor(cursor_factory=RealDictCursor)
            cursor.execute(sql, [json_conditions['key'], json_conditions['value']])
            return [dict(row) for row in cursor.fetchall()]
    
    def execute_cte_query(self, cte_query):
        """执行CTE查询"""
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor(cursor_factory=RealDictCursor)
            cursor.execute(cte_query)
            return [dict(row) for row in cursor.fetchall()]

# 使用示例
pg_config = {
    'host': 'localhost',
    'port': 5432,
    'user': 'postgres',
    'password': 'password',
    'database': 'test_db',
    'connect_timeout': 10,
    'application_name': 'PythonApp'
}

pg_db = PostgreSQLDatabase(pg_config)
pg_crud = PostgreSQLCRUD(pg_db)
pg_advanced = PostgreSQLAdvancedFeatures(pg_db)

3.2 PostgreSQL特有功能

class PostgreSQLSpecificFeatures:
    def __init__(self, db_instance):
        self.db = db_instance
    
    def handle_arrays(self, table, array_data):
        """处理数组类型"""
        sql = """
        INSERT INTO products (name, tags) 
        VALUES (%(name)s, %(tags)s)
        """
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, array_data)
            conn.commit()
    
    def handle_json(self, table, json_data):
        """处理JSON/JSONB类型"""
        sql = """
        INSERT INTO user_profiles (user_id, profile_data) 
        VALUES (%(user_id)s, %(profile_data)s)
        """
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, json_data)
            conn.commit()
    
    def execute_window_functions(self, query):
        """执行窗口函数查询"""
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor(cursor_factory=RealDictCursor)
            cursor.execute(query)
            return [dict(row) for row in cursor.fetchall()]
    
    def execute_full_text_search(self, table, search_term):
        """全文搜索"""
        sql = f"""
        SELECT *, ts_rank(search_vector, plainto_tsquery(%s)) as rank
        FROM {table}
        WHERE search_vector @@ plainto_tsquery(%s)
        ORDER BY rank DESC
        """
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor(cursor_factory=RealDictCursor)
            cursor.execute(sql, [search_term, search_term])
            return [dict(row) for row in cursor.fetchall()]

4. 连接池管理

4.1 MySQL连接池

from DBUtils.PooledDB import PooledDB
import pymysql

class MySQLConnectionPool:
    def __init__(self, config, pool_size=10):
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=pool_size,
            mincached=2,
            maxcached=5,
            maxshared=3,
            blocking=True,
            maxusage=None,
            setsession=[],
            ping=0,
            host=config['host'],
            port=config['port'],
            user=config['user'],
            password=config['password'],
            database=config['database'],
            charset=config['charset']
        )
    
    def get_connection(self):
        """获取连接池中的连接"""
        return self.pool.connection()
    
    @contextmanager
    def get_db_connection(self):
        """连接池上下文管理器"""
        conn = None
        try:
            conn = self.get_connection()
            yield conn
        except Exception as e:
            if conn:
                conn.rollback()
            logging.error(f"数据库操作失败: {e}")
            raise
        finally:
            if conn:
                conn.close()

# 使用连接池
mysql_pool_config = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'password',
    'database': 'test_db',
    'charset': 'utf8mb4'
}

mysql_pool = MySQLConnectionPool(mysql_pool_config, pool_size=20)

4.2 PostgreSQL连接池

from psycopg2 import pool
import threading

class PostgreSQLConnectionPool:
    def __init__(self, config, minconn=1, maxconn=20):
        self.connection_pool = psycopg2.pool.ThreadedConnectionPool(
            minconn=minconn,
            maxconn=maxconn,
            host=config['host'],
            port=config['port'],
            user=config['user'],
            password=config['password'],
            database=config['database'],
            connect_timeout=config['connect_timeout']
        )
        self.lock = threading.Lock()
    
    def get_connection(self):
        """获取连接池中的连接"""
        return self.connection_pool.getconn()
    
    def put_connection(self, conn):
        """归还连接到连接池"""
        self.connection_pool.putconn(conn)
    
    @contextmanager
    def get_db_connection(self):
        """连接池上下文管理器"""
        conn = None
        try:
            conn = self.get_connection()
            yield conn
        except Exception as e:
            if conn:
                conn.rollback()
            logging.error(f"数据库操作失败: {e}")
            raise
        finally:
            if conn:
                self.put_connection(conn)

# 使用PostgreSQL连接池
pg_pool_config = {
    'host': 'localhost',
    'port': 5432,
    'user': 'postgres',
    'password': 'password',
    'database': 'test_db',
    'connect_timeout': 10
}

pg_pool = PostgreSQLConnectionPool(pg_pool_config, minconn=2, maxconn=20)

5. ORM框架使用

5.1 SQLAlchemy基础使用

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import os

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(100), nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    age = Column(Integer)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class DatabaseManager:
    def __init__(self, database_url):
        self.engine = create_engine(
            database_url,
            pool_size=10,
            max_overflow=20,
            pool_pre_ping=True,
            pool_recycle=3600
        )
        self.SessionLocal = sessionmaker(bind=self.engine)
    
    def create_tables(self):
        """创建表"""
        Base.metadata.create_all(bind=self.engine)
    
    @contextmanager
    def get_db_session(self):
        """数据库会话上下文管理器"""
        session = self.SessionLocal()
        try:
            yield session
            session.commit()
        except Exception as e:
            session.rollback()
            logging.error(f"数据库操作失败: {e}")
            raise
        finally:
            session.close()

# MySQL SQLAlchemy配置
mysql_db_url = "mysql+pymysql://username:password@localhost:3306/database_name?charset=utf8mb4"

# PostgreSQL SQLAlchemy配置
pg_db_url = "postgresql+psycopg2://username:password@localhost:5432/database_name"

# 创建数据库管理器
db_manager = DatabaseManager(mysql_db_url)
db_manager.create_tables()

# SQLAlchemy CRUD操作
class SQLAlchemyCRUD:
    def __init__(self, db_manager):
        self.db_manager = db_manager
    
    def create_user(self, name, email, age):
        """创建用户"""
        with self.db_manager.get_db_session() as session:
            user = User(name=name, email=email, age=age)
            session.add(user)
            session.flush()  # 获取ID但不提交
            return user.id
    
    def get_user(self, user_id):
        """获取用户"""
        with self.db_manager.get_db_session() as session:
            return session.query(User).filter(User.id == user_id).first()
    
    def get_users(self, skip=0, limit=10):
        """获取用户列表"""
        with self.db_manager.get_db_session() as session:
            return session.query(User).offset(skip).limit(limit).all()
    
    def update_user(self, user_id, **kwargs):
        """更新用户"""
        with self.db_manager.get_db_session() as session:
            user = session.query(User).filter(User.id == user_id).first()
            if user:
                for key, value in kwargs.items():
                    setattr(user, key, value)
                return True
            return False
    
    def delete_user(self, user_id):
        """删除用户"""
        with self.db_manager.get_db_session() as session:
            user = session.query(User).filter(User.id == user_id).first()
            if user:
                session.delete(user)
                return True
            return False

# 使用SQLAlchemy CRUD
sqlalchemy_crud = SQLAlchemyCRUD(db_manager)
user_id = sqlalchemy_crud.create_user("John Doe", "john@example.com", 30)
user = sqlalchemy_crud.get_user(user_id)

5.2 SQLAlchemy高级特性

from sqlalchemy import func, and_, or_, desc, asc
from sqlalchemy.orm import joinedload, selectinload

class SQLAlchemyAdvanced:
    def __init__(self, db_manager):
        self.db_manager = db_manager
    
    def complex_query(self):
        """复杂查询示例"""
        with self.db_manager.get_db_session() as session:
            # 聚合查询
            result = session.query(
                func.count(User.id).label('total_users'),
                func.avg(User.age).label('avg_age'),
                func.max(User.created_at).label('latest_user')
            ).first()
            
            # 条件查询
            young_users = session.query(User).filter(
                and_(
                    User.age < 30,
                    User.created_at > datetime(2023, 1, 1)
                )
            ).all()
            
            # 排序查询
            ordered_users = session.query(User).order_by(
                desc(User.age), asc(User.name)
            ).limit(10).all()
            
            return result, young_users, ordered_users
    
    def bulk_operations(self, users_data):
        """批量操作"""
        with self.db_manager.get_db_session() as session:
            # 批量插入
            users = [User(**data) for data in users_data]
            session.bulk_save_objects(users)
            
            # 批量更新
            session.query(User).filter(User.age < 18).update({
                User.age: 18
            })
            
            # 批量删除
            session.query(User).filter(User.age > 100).delete()
    
    def relationship_query(self):
        """关联查询(如果有关系的话)"""
        # 示例:如果有Order表关联到User
        # session.query(User).options(joinedload(User.orders)).all()
        pass

6. 性能优化技巧

6.1 查询优化

class PerformanceOptimization:
    def __init__(self, db_instance):
        self.db = db_instance
    
    def optimized_pagination(self, table, page, page_size, filters=None):
        """优化的分页查询"""
        offset = (page - 1) * page_size
        
        # 使用游标分页避免深度分页问题
        if filters:
            where_clause = " AND ".join([f"{k} = %s" for k in filters.keys()])
            sql = f"""
            SELECT * FROM {table} 
            WHERE {where_clause}
            ORDER BY id 
            LIMIT %s OFFSET %s
            """
            params = list(filters.values()) + [page_size, offset]
        else:
            sql = f"SELECT * FROM {table} ORDER BY id LIMIT %s OFFSET %s"
            params = [page_size, offset]
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.fetchall()
    
    def prepared_statement_example(self):
        """预编译语句示例"""
        # 在连接级别启用预编译语句
        pass
    
    def connection_keep_alive(self):
        """连接保持活跃"""
        # 在配置中设置
        # keepalive相关参数
        pass

# 索引优化查询
def create_optimized_indexes():
    """创建优化的索引"""
    mysql_indexes = [
        "CREATE INDEX idx_users_email ON users(email)",
        "CREATE INDEX idx_users_age_created ON users(age, created_at)",
        "CREATE INDEX idx_orders_user_status ON orders(user_id, status)"
    ]
    
    pg_indexes = [
        "CREATE INDEX idx_users_email ON users(email)",
        "CREATE INDEX idx_users_age_created ON users(age, created_at)",
        "CREATE INDEX CONCURRENTLY idx_large_table_col ON large_table(column_name)"  # 并发创建
    ]
    
    return mysql_indexes, pg_indexes

6.2 连接和资源管理

import asyncio
import aiomysql
import asyncpg
from contextlib import asynccontextmanager

class AsyncDatabaseManager:
    """异步数据库管理器"""
    
    def __init__(self, db_type, config):
        self.db_type = db_type
        self.config = config
        self.pool = None
    
    async def initialize_pool(self):
        """初始化连接池"""
        if self.db_type == 'mysql':
            self.pool = await aiomysql.create_pool(**self.config)
        elif self.db_type == 'postgresql':
            self.pool = await asyncpg.create_pool(**self.config)
    
    @asynccontextmanager
    async def get_connection(self):
        """异步获取连接"""
        async with self.pool.acquire() as conn:
            yield conn
    
    async def close_pool(self):
        """关闭连接池"""
        if self.pool:
            self.pool.close()
            await self.pool.wait_closed()

# 使用连接池的最佳实践
class BestPractices:
    def __init__(self):
        self.timeout_settings = {
            'connect_timeout': 10,
            'read_timeout': 30,
            'write_timeout': 30,
            'command_timeout': 60
        }
    
    def connection_configuration(self):
        """连接配置最佳实践"""
        config = {
            # 连接池设置
            'pool_size': 10,
            'max_overflow': 20,
            'pool_timeout': 30,
            'pool_recycle': 3600,  # 1小时回收连接
            'pool_pre_ping': True,  # 连接前验证
            
            # 超时设置
            'connect_timeout': 10,
            'read_timeout': 30,
            'write_timeout': 30,
            
            # 其他设置
            'charset': 'utf8mb4',
            'autocommit': False,
            'sql_mode': 'STRICT_TRANS_TABLES'
        }
        return config
    
    def error_handling_strategy(self):
        """错误处理策略"""
        strategies = {
            'retry_logic': True,
            'circuit_breaker': True,
            'fallback_methods': True,
            'logging': True
        }
        return strategies

7. 安全最佳实践

7.1 SQL注入防护

class SecurityBestPractices:
    def __init__(self, db_instance):
        self.db = db_instance
    
    def safe_query_with_validation(self, table, conditions):
        """带验证的安全查询"""
        # 验证表名(白名单)
        allowed_tables = ['users', 'orders', 'products']
        if table not in allowed_tables:
            raise ValueError(f"不允许的表名: {table}")
        
        # 验证列名
        allowed_columns = {
            'users': ['id', 'name', 'email', 'age'],
            'orders': ['id', 'user_id', 'status', 'amount'],
            'products': ['id', 'name', 'price', 'category']
        }
        
        for col in conditions.keys():
            if col not in allowed_columns.get(table, []):
                raise ValueError(f"不允许的列名: {col}")
        
        # 使用参数化查询
        where_clause = " AND ".join([f"{k} = %s" for k in conditions.keys()])
        sql = f"SELECT * FROM {table} WHERE {where_clause}"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, list(conditions.values()))
            return cursor.fetchall()
    
    def input_sanitization(self, user_input):
        """输入净化"""
        import re
        
        # 移除SQL关键字
        sanitized = re.sub(r'\b(SELECT|INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|EXEC|UNION)\b', '', 
                          user_input, flags=re.IGNORECASE)
        
        # 移除注释
        sanitized = re.sub(r'/\*.*?\*/', '', sanitized)
        sanitized = re.sub(r'--.*', '', sanitized)
        
        return sanitized.strip()
    
    def use_parameterized_queries(self, table, data):
        """使用参数化查询"""
        # 好的做法 - 使用参数化查询
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['%s'] * len(data))
        sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"
        
        with self.db.get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, list(data.values()))
            return cursor.rowcount
        
        # 不好的做法 - 字符串拼接(易受SQL注入攻击)
        # sql = f"INSERT INTO {table} ({columns}) VALUES ({values_str})"

7.2 连接安全

def secure_connection_config():
    """安全连接配置"""
    secure_mysql_config = {
        'host': 'localhost',
        'port': 3306,
        'user': 'app_user',
        'password': 'secure_password',  # 从环境变量获取
        'database': 'app_db',
        'charset': 'utf8mb4',
        'ssl': {
            'ssl_ca': '/path/to/ca.pem',
            'ssl_cert': '/path/to/client-cert.pem',
            'ssl_key': '/path/to/client-key.pem'
        },
        'autocommit': False,
        'connect_timeout': 10
    }
    
    secure_pg_config = {
        'host': 'localhost',
        'port': 5432,
        'user': 'app_user',
        'password': 'secure_password',  # 从环境变量获取
        'database': 'app_db',
        'sslmode': 'require',  # 或 'verify-full'
        'connect_timeout': 10,
        'application_name': 'SecureApp'
    }
    
    return secure_mysql_config, secure_pg_config

# 环境变量配置
import os
from dotenv import load_dotenv

load_dotenv()

DB_CONFIG = {
    'mysql': {
        'host': os.getenv('MYSQL_HOST', 'localhost'),
        'port': int(os.getenv('MYSQL_PORT', 3306)),
        'user': os.getenv('MYSQL_USER'),
        'password': os.getenv('MYSQL_PASSWORD'),
        'database': os.getenv('MYSQL_DATABASE'),
        'charset': 'utf8mb4'
    },
    'postgresql': {
        'host': os.getenv('PG_HOST', 'localhost'),
        'port': int(os.getenv('PG_PORT', 5432)),
        'user': os.getenv('PG_USER'),
        'password': os.getenv('PG_PASSWORD'),
        'database': os.getenv('PG_DATABASE')
    }
}

8. 监控和调试

8.1 性能监控

import time
import functools
from typing import Callable, Any

def monitor_db_performance(func: Callable) -> Callable:
    """数据库性能监控装饰器"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        start_memory = 0  # 可以集成内存监控
        
        try:
            result = func(*args, **kwargs)
            success = True
        except Exception as e:
            success = False
            raise
        finally:
            end_time = time.time()
            duration = end_time - start_time
            
            # 记录性能指标
            logging.info(f"DB Operation: {func.__name__}, "
                        f"Duration: {duration:.4f}s, "
                        f"Success: {success}")
            
            # 如果耗时过长,记录警告
            if duration > 1.0:  # 超过1秒
                logging.warning(f"Slow query detected: {func.__name__}, "
                               f"Duration: {duration:.4f}s")
        
        return result
    return wrapper

class DatabaseMonitoring:
    def __init__(self):
        self.query_count = 0
        self.slow_queries = []
        self.error_count = 0
    
    @monitor_db_performance
    def monitored_query(self, connection, query, params=None):
        """带监控的查询"""
        cursor = connection.cursor()
        try:
            cursor.execute(query, params)
            result = cursor.fetchall()
            self.query_count += 1
            return result
        except Exception as e:
            self.error_count += 1
            raise
        finally:
            cursor.close()

# 查询分析
def explain_query_performance(connection, query, params=None):
    """分析查询性能"""
    cursor = connection.cursor()
    
    # MySQL
    explain_sql = f"EXPLAIN {query}"
    cursor.execute(explain_sql, params)
    explain_result = cursor.fetchall()
    
    print("Query Execution Plan:")
    for row in explain_result:
        print(row)
    
    cursor.close()
    return explain_result

8.2 日志记录

import logging
import json
from datetime import datetime

class DatabaseLogger:
    def __init__(self, logger_name='db_logger'):
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        
        # 创建处理器
        handler = logging.FileHandler('database_operations.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_query(self, query, params, duration, connection_info):
        """记录查询日志"""
        log_data = {
            'timestamp': datetime.now().isoformat(),
            'query': query,
            'params': params,
            'duration': duration,
            'connection_info': connection_info,
            'query_hash': hash(query)  # 用于识别相似查询
        }
        
        self.logger.info(json.dumps(log_data))
    
    def log_slow_query(self, query, params, duration, threshold=1.0):
        """记录慢查询"""
        if duration > threshold:
            self.logger.warning(f"SLOW QUERY (>{threshold}s): {query[:100]}...")
    
    def log_error(self, query, params, error):
        """记录错误"""
        error_data = {
            'timestamp': datetime.now().isoformat(),
            'query': query,
            'params': params,
            'error': str(error),
            'error_type': type(error).__name__
        }
        
        self.logger.error(json.dumps(error_data))

# 使用示例
db_logger = DatabaseLogger()

def execute_with_logging(connection, query, params=None):
    """带日志的执行函数"""
    start_time = time.time()
    
    try:
        cursor = connection.cursor()
        cursor.execute(query, params)
        result = cursor.fetchall()
        
        duration = time.time() - start_time
        db_logger.log_query(query, params, duration, 
                           {'host': connection.host, 'database': connection.db})
        db_logger.log_slow_query(query, params, duration)
        
        return result
    except Exception as e:
        duration = time.time() - start_time
        db_logger.log_error(query, params, e)
        raise

9. 实际应用示例

9.1 Web应用集成

from flask import Flask, request, jsonify
from contextlib import contextmanager

app = Flask(__name__)

class WebAppDatabase:
    def __init__(self, db_manager):
        self.db_manager = db_manager
    
    @contextmanager
    def get_db_session(self):
        """Web应用数据库会话管理"""
        session = self.db_manager.SessionLocal()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

# Flask路由示例
@app.route('/users', methods=['GET'])
def get_users():
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    
    with app.db_manager.get_db_session() as session:
        users = session.query(User)\
                      .offset((page - 1) * per_page)\
                      .limit(per_page)\
                      .all()
        
        return jsonify({
            'users': [{'id': u.id, 'name': u.name, 'email': u.email} for u in users],
            'page': page,
            'per_page': per_page
        })

@app.route('/users', methods=['POST'])
def create_user():
    data = request.get_json()
    
    with app.db_manager.get_db_session() as session:
        user = User(name=data['name'], email=data['email'], age=data['age'])
        session.add(user)
        session.commit()
        
        return jsonify({'id': user.id}), 201

# 错误处理
@app.errorhandler(Exception)
def handle_exception(e):
    app.db_manager.SessionLocal().rollback()
    return jsonify({'error': str(e)}), 500

9.2 数据迁移脚本

import csv
import json
from typing import List, Dict, Any

class DataMigration:
    def __init__(self, source_db, target_db):
        self.source_db = source_db
        self.target_db = target_db
    
    def migrate_table(self, table_name: str, batch_size: int = 1000):
        """迁移表数据"""
        # 从源数据库读取数据
        with self.source_db.get_db_connection() as source_conn:
            source_cursor = source_conn.cursor()
            source_cursor.execute(f"SELECT * FROM {table_name}")
            
            # 分批处理
            while True:
                rows = source_cursor.fetchmany(batch_size)
                if not rows:
                    break
                
                # 转换为字典列表
                columns = [desc[0] for desc in source_cursor.description]
                data_list = [dict(zip(columns, row)) for row in rows]
                
                # 插入到目标数据库
                self.insert_batch(target_table=table_name, data_list=data_list)
    
    def export_to_csv(self, table_name: str, filename: str):
        """导出表到CSV"""
        with self.source_db.get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(f"SELECT * FROM {table_name}")
            
            columns = [desc[0] for desc in cursor.description]
            
            with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(columns)  # 写入表头
                
                for row in cursor:
                    writer.writerow(row)
    
    def import_from_csv(self, table_name: str, filename: str):
        """从CSV导入数据"""
        with open(filename, 'r', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)
            data_list = [row for row in reader]
            
            # 批量插入
            self.insert_batch(table_name, data_list)
    
    def insert_batch(self, target_table: str, data_list: List[Dict[str, Any]]):
        """批量插入数据"""
        if not data_list:
            return
        
        with self.target_db.get_db_connection() as conn:
            cursor = conn.cursor()
            
            # 构建SQL语句
            columns = list(data_list[0].keys())
            placeholders = ', '.join(['%s'] * len(columns))
            sql = f"INSERT INTO {target_table} ({', '.join(columns)}) VALUES ({placeholders})"
            
            # 准备数据
            values_list = [tuple(row[col] for col in columns) for row in data_list]
            
            cursor.executemany(sql, values_list)
            conn.commit()

# 使用示例
migration = DataMigration(source_db=mysql_db, target_db=pg_db)
migration.migrate_table('users', batch_size=500)

10. 常见问题与解决方案

10.1 连接问题

class ConnectionTroubleshooting:
    def __init__(self):
        self.checklist = [
            "检查数据库服务是否运行",
            "验证连接参数(主机、端口、用户名、密码)",
            "检查防火墙设置",
            "验证网络连通性",
            "检查数据库用户权限",
            "查看数据库日志"
        ]
    
    def test_connection(self, config):
        """测试数据库连接"""
        try:
            if config.get('database_type') == 'mysql':
                import pymysql
                conn = pymysql.connect(**{k: v for k, v in config.items() 
                                        if k != 'database_type'})
            else:
                import psycopg2
                conn = psycopg2.connect(**{k: v for k, v in config.items() 
                                         if k != 'database_type'})
            
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            result = cursor.fetchone()
            conn.close()
            
            return result is not None
        except Exception as e:
            logging.error(f"连接测试失败: {e}")
            return False
    
    def connection_retry_mechanism(self, connect_func, max_retries=3, delay=1):
        """连接重试机制"""
        import time
        
        for attempt in range(max_retries):
            try:
                return connect_func()
            except Exception as e:
                if attempt == max_retries - 1:
                    raise e
                
                logging.warning(f"连接失败,{delay}秒后重试... (尝试 {attempt + 1}/{max_retries})")
                time.sleep(delay)
                delay *= 2  # 指数退避

10.2 性能问题

class PerformanceTroubleshooting:
    def __init__(self):
        pass
    
    def identify_slow_queries(self, db_instance):
        """识别慢查询"""
        # MySQL
        mysql_slow_query = """
        SELECT * FROM information_schema.processlist 
        WHERE TIME > 60 AND COMMAND != 'Sleep'
        """
        
        # PostgreSQL
        pg_slow_query = """
        SELECT pid, now() - pg_stat_activity.query_start AS duration, 
               query, state 
        FROM pg_stat_activity 
        WHERE (now() - pg_stat_activity.query_start) > interval '60 seconds'
        AND state = 'active'
        """
        
        return mysql_slow_query, pg_slow_query
    
    def optimize_common_issues(self):
        """优化常见问题"""
        optimizations = {
            "missing_indexes": "CREATE INDEX ON table(column)",
            "inefficient_joins": "Add proper indexes on join columns",
            "full_table_scans": "Add WHERE clause indexes",
            "large_result_sets": "Implement pagination",
            "n_plus_one_queries": "Use JOINs or eager loading"
        }
        return optimizations

相关教程

使用连接池管理数据库连接,采用参数化查询防止SQL注入,定期监控数据库性能,合理设计索引以优化查询效率。

总结

Python接入MySQL和PostgreSQL数据库是现代Web开发的重要技能:

  • MySQL:适合Web应用,连接简单,社区支持广泛
  • PostgreSQL:功能丰富,支持高级数据类型,适合复杂应用

无论选择哪种数据库,都应该:

  1. 使用连接池:提高性能和资源利用率
  2. 参数化查询:防止SQL注入攻击
  3. 事务管理:确保数据一致性
  4. 错误处理:优雅处理异常情况
  5. 性能监控:持续优化查询性能
  6. 安全配置:保护数据库连接安全

通过合理的设计和实现,可以构建高效、安全、可维护的数据库应用系统。