Python多线程编程指南

并发编程是提升Python应用性能的常见手段,其中多线程因为创建开销小、数据共享便捷、易用性高,在I/O密集型场景下特别吃香(比如爬虫、数据库操作、网络请求)。这篇文章会带你快速梳理Python多线程的核心知识点,从基础用法、常见坑点、GIL限制到最佳实践,代码高亮,段落分明,3000字内搞定~


1. 线程基础

Python通过标准库threading提供多线程支持(底层封装了系统线程接口:Windows下用Win32线程,Linux/macOS下用pthread)。每个进程默认会有一个主线程(MainThread),我们可以通过它创建额外的子线程来执行并发任务。

1.1 创建线程的两种方式

最常用的是函数式创建,代码简洁直观;如果需要更细粒度的控制(比如复用线程类),可以用继承类创建

函数式创建(推荐入门用)

import threading
import time

def worker():
    """子线程要做的事:假装忙一会儿(sleep模拟真实I/O)"""
    print(f'✅ 子线程 {threading.current_thread().name} 启动...')
    time.sleep(1)
    print(f'❌ 子线程 {threading.current_thread().name} 结束')

# 创建线程对象:target是要执行的函数,name给线程起个便于调试的名字
t = threading.Thread(target=worker, name='TestWorker')
t.start()  # 启动线程(只是告诉操作系统准备好了,具体什么时候执行看调度器)
t.join()   # join() 会**阻塞主线程**,直到子线程执行完,避免主线程提前退出看不到结果
print(f'🏁 主线程 {threading.current_thread().name} 结束')

继承类创建(适合复杂复用)

import threading
import time

class CustomWorker(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)  # 必须先调用父类的初始化方法

    # 重写run()方法,子线程启动后执行的就是这个方法
    def run(self):
        print(f'✅ 自定义子线程 {self.name} 启动...')
        time.sleep(1)
        print(f'❌ 自定义子线程 {self.name} 结束')

t = CustomWorker(name='CustomTest')
t.start()
t.join()
print(f'🏁 主线程结束')

1.2 常用的线程属性/方法

threading模块自带了几个实用的全局工具,调试和管理线程很方便:

  • threading.current_thread():获取当前正在执行的线程实例
  • threading.active_count():返回当前活跃的线程总数(包括主线程)
  • threading.enumerate():返回当前所有活跃线程的列表

2. 线程同步与锁

多线程有个致命优势同时也是致命坑点:它们共享同一个进程的内存空间。如果多个线程同时修改同一个全局变量,就会出现竞争条件(Race Condition),导致结果不可预测。

2.1 竞争条件示例

比如下面这个累加器,10个线程各加10万次,预期结果是100万,但实际运行每次结果都不一样,而且大概率小于100万:

import threading

counter = 0  # 全局共享变量

def unsafe_increment():
    global counter
    for _ in range(100000):
        # 这一行代码底层其实是3步:读取counter、加1、写回counter
        # 多线程切换可能发生在这3步之间!
        counter += 1

threads = []
for _ in range(10):
    t = threading.Thread(target=unsafe_increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"❌ 不安全累加器最终值: {counter}")  # 多次运行试试,几乎不会是1000000

2.2 使用Lock(互斥锁)解决竞争

threading.Lock是最简单的同步原语,它的规则是:同一时间只有一个线程能拿到锁,其他线程必须等待直到锁被释放。

推荐用with语句管理锁,它会自动处理获取和释放,避免忘记释放锁导致的死锁(Deadlock):

import threading

counter = 0
lock = threading.Lock()  # 创建一个全局互斥锁

def safe_increment():
    global counter
    for _ in range(100000):
        # with lock会自动获取锁,代码块结束自动释放
        with lock:
            counter += 1

threads = []
for _ in range(10):
    t = threading.Thread(target=safe_increment)
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"✅ 安全累加器最终值: {counter}")  # 无论运行多少次,都是1000000

2.3 其他常用同步原语

除了基础的Lock,还有几个进阶原语可以解决更复杂的场景:

  • threading.RLock可重入锁,同一线程可以多次获取(适合递归调用)
  • threading.Condition条件变量,结合Lock使用,用于线程间“等待-通知”通信
  • threading.Semaphore信号量,控制同时访问某个资源的线程数(比如限制数据库连接数)
  • threading.Event事件对象,用于线程间简单的“开关式”通知

3. 绕不开的GIL(全局解释器锁)

很多初学者会疑惑:“Python有了多线程,为什么CPU密集型任务(比如科学计算、图像处理)反而变慢?”答案就在CPython解释器的GIL(Global Interpreter Lock)

3.1 GIL的本质

GIL是CPython解释器的实现细节(Jython、IronPython等解释器没有),它的规则是:任何时候,只有一个线程在执行Python字节码,其他线程必须等待GIL被释放。

GIL的释放时机主要有两个:

  1. 遇到I/O操作(比如time.sleep()、网络请求、读写文件)时
  2. Python 3.2+后,解释器会每15毫秒强制切换一次线程(不管有没有I/O)

3.2 GIL的影响

  1. I/O密集型任务:多线程仍能受益!因为遇到I/O时GIL会释放,CPU不会空等
  2. CPU密集型任务:多线程无法有效利用多核!因为同一时间只有一个线程在跑字节码,强制切换反而会增加开销
  3. C扩展计算:如果计算逻辑放在C/C++扩展中,可以手动释放GIL,此时多线程也能跑多核

3.3 处理GIL限制的方案

  • 使用多进程multiprocessing模块会为每个进程创建独立的解释器和GIL,完全绕过限制(适合CPU密集型)
  • 异步编程asyncio是单线程协作式并发,没有GIL切换开销(适合高并发I/O密集型)
  • 使用其他解释器:Jython(基于Java)、IronPython(基于.NET)没有GIL,但生态不如CPython完善
  • C扩展/Cython:在核心计算部分用C/Cython实现并释放GIL

4. 线程池与最佳实践

4.1 使用ThreadPoolExecutor(推荐)

手动创建和管理大量线程很麻烦(比如控制线程数量、回收资源、处理异常),Python 3.2+提供的concurrent.futures.ThreadPoolExecutor可以帮我们自动完成这些事。

基础用法

from concurrent.futures import ThreadPoolExecutor
import time

def square_task(n):
    """模拟一个带I/O的计算任务"""
    print(f"处理任务 {n}...")
    time.sleep(1)  # 模拟I/O
    return n * n

# 使用上下文管理器with,自动关闭线程池
with ThreadPoolExecutor(max_workers=4) as executor:  # max_workers建议设为CPU核心数*2左右(I/O密集型)
    # 提交10个任务,返回Future对象列表
    futures = [executor.submit(square_task, i) for i in range(10)]
    # 获取所有任务的结果(按提交顺序)
    results = [f.result() for f in futures]

print(f"🏁 所有任务结果: {results}")

快速获取结果(不按顺序)

如果不需要按提交顺序获取结果,可以用executor.map()concurrent.futures.as_completed()

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def square_task(n):
    time.sleep(n % 3)  # 让任务执行时间不一样
    return n * n

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(square_task, i): i for i in range(10)}
    # as_completed()会在任意一个任务完成时立即返回它的Future
    for future in as_completed(futures):
        n = futures[future]
        try:
            result = future.result()
            print(f"任务 {n} 完成,结果: {result}")
        except Exception as e:
            print(f"任务 {n} 出错: {e}")

4.2 多线程编程最佳实践

  1. 尽量避免共享全局变量:必须共享时优先用锁,且锁的粒度要尽可能小(锁整个循环会让多线程变成单线程,白折腾)
  2. 优先使用线程安全的数据结构:比如queue.Queue(线程安全的队列,适合线程间通信)
  3. 使用线程局部存储:每个线程独立持有一份数据,避免共享(见下一节)
  4. 妥善处理线程异常:子线程的异常默认不会传播到主线程,要在任务函数内部捕获,或者用Future的result()/exception()获取
  5. 不要滥用多线程:任务数很少、或者纯CPU密集型时,多线程反而可能更慢

5. 线程局部存储

如果每个线程需要保存自己的临时数据(比如数据库连接、用户会话),可以用threading.local()创建线程局部存储对象,每个线程对它的读写都是独立的。

import threading

# 创建一个全局的线程局部存储对象
local_db = threading.local()

def init_db_connection():
    """为当前线程初始化一个独立的数据库连接(模拟)"""
    local_db.conn = f"DBConn-{threading.current_thread().name}"

def use_db():
    """使用当前线程的数据库连接"""
    print(f"{threading.current_thread().name} 使用连接: {local_db.conn}")

def worker():
    init_db_connection()
    use_db()

threads = [
    threading.Thread(target=worker, name=f"Worker-{i}")
    for i in range(3)
]

for t in threads:
    t.start()
for t in threads:
    t.join()

6. 多线程 vs 多进程选择指南

特性多线程多进程
内存共享共享(方便但容易有竞争)不共享(独立进程空间)
创建/切换开销小(仅栈等少量资源)大(独立解释器、内存空间)
通信成本低(直接读写共享内存加锁)高(需要IPC:管道、队列等)
GIL影响受限制(I/O密集型可用)不受限(适合CPU密集型)
适用场景爬虫、网络请求、数据库操作科学计算、图像处理、视频编码

7. 现代Python并发编程总结

  1. I/O密集型、低并发:直接用ThreadPoolExecutor
  2. I/O密集型、高并发:优先考虑asyncio(单线程协作式,无切换开销)
  3. CPU密集型:用multiprocessingProcessPoolExecutor
  4. 复杂场景:比如需要分布式执行,用Celery等任务队列

Python 3.10+还提供了更强大的并发工具(比如asyncio.TaskGroup、更灵活的Executor超时控制),开发者可以根据具体需求选择合适的模型~