Python多线程编程指南
并发编程是提升Python应用性能的常见手段,其中多线程因为创建开销小、数据共享便捷、易用性高,在I/O密集型场景下特别吃香(比如爬虫、数据库操作、网络请求)。这篇文章会带你快速梳理Python多线程的核心知识点,从基础用法、常见坑点、GIL限制到最佳实践,代码高亮,段落分明,3000字内搞定~
1. 线程基础
Python通过标准库threading提供多线程支持(底层封装了系统线程接口:Windows下用Win32线程,Linux/macOS下用pthread)。每个进程默认会有一个主线程(MainThread),我们可以通过它创建额外的子线程来执行并发任务。
1.1 创建线程的两种方式
最常用的是函数式创建,代码简洁直观;如果需要更细粒度的控制(比如复用线程类),可以用继承类创建。
函数式创建(推荐入门用)
import threading
import time
def worker():
"""子线程要做的事:假装忙一会儿(sleep模拟真实I/O)"""
print(f'✅ 子线程 {threading.current_thread().name} 启动...')
time.sleep(1)
print(f'❌ 子线程 {threading.current_thread().name} 结束')
# 创建线程对象:target是要执行的函数,name给线程起个便于调试的名字
t = threading.Thread(target=worker, name='TestWorker')
t.start() # 启动线程(只是告诉操作系统准备好了,具体什么时候执行看调度器)
t.join() # join() 会**阻塞主线程**,直到子线程执行完,避免主线程提前退出看不到结果
print(f'🏁 主线程 {threading.current_thread().name} 结束')
继承类创建(适合复杂复用)
import threading
import time
class CustomWorker(threading.Thread):
def __init__(self, name):
super().__init__(name=name) # 必须先调用父类的初始化方法
# 重写run()方法,子线程启动后执行的就是这个方法
def run(self):
print(f'✅ 自定义子线程 {self.name} 启动...')
time.sleep(1)
print(f'❌ 自定义子线程 {self.name} 结束')
t = CustomWorker(name='CustomTest')
t.start()
t.join()
print(f'🏁 主线程结束')
1.2 常用的线程属性/方法
threading模块自带了几个实用的全局工具,调试和管理线程很方便:
threading.current_thread():获取当前正在执行的线程实例
threading.active_count():返回当前活跃的线程总数(包括主线程)
threading.enumerate():返回当前所有活跃线程的列表
2. 线程同步与锁
多线程有个致命优势同时也是致命坑点:它们共享同一个进程的内存空间。如果多个线程同时修改同一个全局变量,就会出现竞争条件(Race Condition),导致结果不可预测。
2.1 竞争条件示例
比如下面这个累加器,10个线程各加10万次,预期结果是100万,但实际运行每次结果都不一样,而且大概率小于100万:
import threading
counter = 0 # 全局共享变量
def unsafe_increment():
global counter
for _ in range(100000):
# 这一行代码底层其实是3步:读取counter、加1、写回counter
# 多线程切换可能发生在这3步之间!
counter += 1
threads = []
for _ in range(10):
t = threading.Thread(target=unsafe_increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"❌ 不安全累加器最终值: {counter}") # 多次运行试试,几乎不会是1000000
2.2 使用Lock(互斥锁)解决竞争
threading.Lock是最简单的同步原语,它的规则是:同一时间只有一个线程能拿到锁,其他线程必须等待直到锁被释放。
推荐用with语句管理锁,它会自动处理获取和释放,避免忘记释放锁导致的死锁(Deadlock):
import threading
counter = 0
lock = threading.Lock() # 创建一个全局互斥锁
def safe_increment():
global counter
for _ in range(100000):
# with lock会自动获取锁,代码块结束自动释放
with lock:
counter += 1
threads = []
for _ in range(10):
t = threading.Thread(target=safe_increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"✅ 安全累加器最终值: {counter}") # 无论运行多少次,都是1000000
2.3 其他常用同步原语
除了基础的Lock,还有几个进阶原语可以解决更复杂的场景:
threading.RLock:可重入锁,同一线程可以多次获取(适合递归调用)
threading.Condition:条件变量,结合Lock使用,用于线程间“等待-通知”通信
threading.Semaphore:信号量,控制同时访问某个资源的线程数(比如限制数据库连接数)
threading.Event:事件对象,用于线程间简单的“开关式”通知
3. 绕不开的GIL(全局解释器锁)
很多初学者会疑惑:“Python有了多线程,为什么CPU密集型任务(比如科学计算、图像处理)反而变慢?”答案就在CPython解释器的GIL(Global Interpreter Lock)。
3.1 GIL的本质
GIL是CPython解释器的实现细节(Jython、IronPython等解释器没有),它的规则是:任何时候,只有一个线程在执行Python字节码,其他线程必须等待GIL被释放。
GIL的释放时机主要有两个:
- 遇到I/O操作(比如
time.sleep()、网络请求、读写文件)时
- Python 3.2+后,解释器会每15毫秒强制切换一次线程(不管有没有I/O)
3.2 GIL的影响
- I/O密集型任务:多线程仍能受益!因为遇到I/O时GIL会释放,CPU不会空等
- CPU密集型任务:多线程无法有效利用多核!因为同一时间只有一个线程在跑字节码,强制切换反而会增加开销
- C扩展计算:如果计算逻辑放在C/C++扩展中,可以手动释放GIL,此时多线程也能跑多核
3.3 处理GIL限制的方案
- 使用多进程:
multiprocessing模块会为每个进程创建独立的解释器和GIL,完全绕过限制(适合CPU密集型)
- 异步编程:
asyncio是单线程协作式并发,没有GIL切换开销(适合高并发I/O密集型)
- 使用其他解释器:Jython(基于Java)、IronPython(基于.NET)没有GIL,但生态不如CPython完善
- C扩展/Cython:在核心计算部分用C/Cython实现并释放GIL
4. 线程池与最佳实践
4.1 使用ThreadPoolExecutor(推荐)
手动创建和管理大量线程很麻烦(比如控制线程数量、回收资源、处理异常),Python 3.2+提供的concurrent.futures.ThreadPoolExecutor可以帮我们自动完成这些事。
基础用法
from concurrent.futures import ThreadPoolExecutor
import time
def square_task(n):
"""模拟一个带I/O的计算任务"""
print(f"处理任务 {n}...")
time.sleep(1) # 模拟I/O
return n * n
# 使用上下文管理器with,自动关闭线程池
with ThreadPoolExecutor(max_workers=4) as executor: # max_workers建议设为CPU核心数*2左右(I/O密集型)
# 提交10个任务,返回Future对象列表
futures = [executor.submit(square_task, i) for i in range(10)]
# 获取所有任务的结果(按提交顺序)
results = [f.result() for f in futures]
print(f"🏁 所有任务结果: {results}")
快速获取结果(不按顺序)
如果不需要按提交顺序获取结果,可以用executor.map()或concurrent.futures.as_completed():
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def square_task(n):
time.sleep(n % 3) # 让任务执行时间不一样
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(square_task, i): i for i in range(10)}
# as_completed()会在任意一个任务完成时立即返回它的Future
for future in as_completed(futures):
n = futures[future]
try:
result = future.result()
print(f"任务 {n} 完成,结果: {result}")
except Exception as e:
print(f"任务 {n} 出错: {e}")
4.2 多线程编程最佳实践
- 尽量避免共享全局变量:必须共享时优先用锁,且锁的粒度要尽可能小(锁整个循环会让多线程变成单线程,白折腾)
- 优先使用线程安全的数据结构:比如
queue.Queue(线程安全的队列,适合线程间通信)
- 使用线程局部存储:每个线程独立持有一份数据,避免共享(见下一节)
- 妥善处理线程异常:子线程的异常默认不会传播到主线程,要在任务函数内部捕获,或者用Future的
result()/exception()获取
- 不要滥用多线程:任务数很少、或者纯CPU密集型时,多线程反而可能更慢
5. 线程局部存储
如果每个线程需要保存自己的临时数据(比如数据库连接、用户会话),可以用threading.local()创建线程局部存储对象,每个线程对它的读写都是独立的。
import threading
# 创建一个全局的线程局部存储对象
local_db = threading.local()
def init_db_connection():
"""为当前线程初始化一个独立的数据库连接(模拟)"""
local_db.conn = f"DBConn-{threading.current_thread().name}"
def use_db():
"""使用当前线程的数据库连接"""
print(f"{threading.current_thread().name} 使用连接: {local_db.conn}")
def worker():
init_db_connection()
use_db()
threads = [
threading.Thread(target=worker, name=f"Worker-{i}")
for i in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()
6. 多线程 vs 多进程选择指南
7. 现代Python并发编程总结
- I/O密集型、低并发:直接用
ThreadPoolExecutor
- I/O密集型、高并发:优先考虑
asyncio(单线程协作式,无切换开销)
- CPU密集型:用
multiprocessing或ProcessPoolExecutor
- 复杂场景:比如需要分布式执行,用
Celery等任务队列
Python 3.10+还提供了更强大的并发工具(比如asyncio.TaskGroup、更灵活的Executor超时控制),开发者可以根据具体需求选择合适的模型~