Python多进程编程指南
1. 进程基础
1.1 为什么要用多进程?
你有没有遇到过跑一个Python脚本,电脑CPU核心只用了10%不到的情况?或者处理大量数据时,单线程卡在IO间隙之外,还是慢悠悠的?这时候多进程就是救星——
进程是操作系统资源分配和调度的基本单位,每个进程都有独立的内存空间、文件描述符等,互不干扰(不像多线程共享全局变量容易踩坑),能真正利用多核CPU的并行能力。
Python为了规避解释器的GIL(全局解释器锁,这个后面提注意事项时会展开)对CPU密集型任务的限制,专门做了完善的多进程支持,常见的实现方式有:
os.fork()(仅限Unix/Linux/macOS,底层但不跨平台)
multiprocessing 模块(Python官方跨平台方案,支持精细控制)
concurrent.futures.ProcessPoolExecutor(Python 3.2+ 高级封装,代码更简洁)
subprocess 模块(主要用于调用外部命令进程)
2. 仅限Unix-like的底层创建:os.fork()
Unix/Linux/macOS内核提供了fork()系统调用,它会几乎完全复制当前父进程的副本(子进程),区别仅在返回值:
- 父进程中返回子进程的PID(进程ID)
- 子进程中返回0
来看个极简示例:
import os
print(f"父进程启动!PID:{os.getpid()}")
# 这里是关键分叉点
pid = os.fork()
if pid == 0:
# 子进程逻辑
print(f"我是子进程,PID:{os.getpid()},我的父进程是:{os.getppid()}")
else:
# 父进程逻辑
print(f"我是父进程,PID:{os.getpid()},刚创建了子进程:{pid}")
⚠️ 必须注意的限制:
- Windows系统完全不支持
fork(),跑这段代码会报错
- 复制内存空间的开销很大(虽然有写时复制机制,但还是不如
spawn安全通用)
- 子进程会继承父进程的所有资源状态,但修改不会同步回去
3. 官方跨平台首选:multiprocessing模块
为了统一不同平台的多进程实现,Python提供了模拟多线程API的multiprocessing,完全兼容所有主流操作系统。
3.1 单个子进程控制:Process类
就像用threading.Thread创建线程一样,Process类用target指定子进程要跑的函数,args/kwargs传参,start()启动,join()等待结束。
from multiprocessing import Process
import os
# 子进程要执行的任务
def print_child_info(name: str) -> None:
print(f"🚀 子进程启动 | 名称:{name} | PID:{os.getpid()}")
if __name__ == '__main__':
# ⚠️ 必须加这行!Windows会解释成模块重新导入并执行所有顶层代码
print(f"👨💼 父进程启动 | PID:{os.getpid()}")
# 创建Process实例
p = Process(target=print_child_info, args=("测试子进程",))
print("⏳ 准备启动子进程...")
p.start() # 真正启动子进程
p.join() # 阻塞父进程,直到子进程结束再继续
print("✅ 子进程已结束")
3.2 批量子进程管理:Pool进程池
如果需要同时跑几十个、上百个小任务,不要一个个创建Process——进程创建和销毁的开销不小,用进程池可以复用已创建的进程,效率提升很多。
推荐用Python 3.3+支持的上下文管理器with Pool(...),自动处理资源释放:
from multiprocessing import Pool
import os
import time
import random
# 模拟耗时的CPU密集型/IO密集型任务
def simulate_task(task_id: int) -> str:
start_time = time.time()
print(f"🛠️ 任务{task_id}开始 | 处理进程:{os.getpid()}")
# 随机休眠0-3秒,模拟耗时
time.sleep(random.uniform(0, 3))
cost_time = round(time.time() - start_time, 2)
print(f"✅ 任务{task_id}完成 | 耗时:{cost_time}s")
return f"任务{task_id}的结果"
if __name__ == '__main__':
print(f"👨💼 父进程启动 | PID:{os.getpid()}")
# 创建最大容量为4的进程池(默认等于CPU核心数)
with Pool(processes=4) as pool:
# 用apply_async异步提交5个任务(非阻塞)
task_results = [pool.apply_async(simulate_task, args=(i,)) for i in range(5)]
print("⏳ 所有任务已提交,等待完成...")
# 获取所有任务的结果(如果某个任务没完成,会阻塞在这里)
for res in task_results:
print(f"📦 拿到:{res.get()}")
print("🏁 所有任务和进程池已清理完毕")
📌 Pool的两个关键方法:
apply_async(func, args):非阻塞异步提交,适合批量任务
apply(func, args):阻塞同步提交,等当前任务完成才提交下一个,几乎用不上
4. 外部命令进程调用:subprocess模块
如果你的任务不是写Python函数,而是调用Shell脚本、系统命令、其他语言程序,就用subprocess模块,它完全替代了旧版的os.system、os.popen,更安全可控。
4.1 简单调用(获取返回值和输出)
推荐用Python 3.5+的subprocess.run(),支持捕获标准输出/错误、设置超时等:
import subprocess
# 调用ls -l,捕获输出并转为文本(text=True)
result = subprocess.run(
["ls", "-l"], # 参数必须是列表(避免Shell注入风险)
capture_output=True,
text=True,
check=True # 如果命令执行失败(返回码非0),抛出异常
)
print("📁 当前目录内容:")
print(result.stdout)
4.2 与外部进程交互
如果需要给外部进程发输入、收实时输出,用Popen类(配合上下文管理器):
import subprocess
# 启动Python交互式解释器,发一段简单代码
with subprocess.Popen(
["python3", "-i"],
stdin=subprocess.PIPE, # 可以给子进程发输入
stdout=subprocess.PIPE, # 可以收子进程的输出
stderr=subprocess.PIPE, # 可以收子进程的错误
text=True,
bufsize=1,
universal_newlines=True
) as proc:
# 发送命令并换行
proc.stdin.write('print("👋 来自子进程的问候!")\n')
proc.stdin.write('exit()\n')
# 读取所有输出和错误
out, err = proc.communicate()
print("📤 子进程输出:")
print(out.strip())
5. 进程间通信(IPC):Queue与Pipe
因为进程有独立的内存空间,不能像多线程那样直接共享全局变量,必须用专门的IPC机制传递数据。multiprocessing内置了两种常用的:
5.1 多生产者-多消费者:Queue
队列是最常用的IPC方式,线程安全、进程安全,支持多个进程往里面写、多个进程往外读。
from multiprocessing import Process, Queue
import os
import time
import random
# 生产者进程:往队列里塞数据
def producer(q: Queue) -> None:
print(f"🏭 生产者启动 | PID:{os.getpid()}")
for item in ["苹果", "香蕉", "橙子"]:
print(f"📤 生产者放入:{item}")
q.put(item)
time.sleep(random.uniform(0.5, 1.5))
# 消费者进程:从队列里取数据
def consumer(q: Queue) -> None:
print(f"🍽️ 消费者启动 | PID:{os.getpid()}")
while True:
# get(True):阻塞直到队列有数据
item = q.get(True)
if item == "END":
print("🍽️ 收到结束信号,消费者退出")
break
print(f"📥 消费者取出:{item}")
if __name__ == '__main__':
q = Queue()
# 创建生产者和消费者
p_prod = Process(target=producer, args=(q,))
p_cons = Process(target=consumer, args=(q,))
# 启动进程
p_prod.start()
p_cons.start()
# 等待生产者完成
p_prod.join()
# 往队列里塞结束信号(有几个消费者就塞几个)
q.put("END")
p_cons.join()
5.2 一对一通信:Pipe
Pipe是双向(默认)或单向的管道,适合两个进程之间快速传递数据。
from multiprocessing import Process, Pipe
# 发送者进程
def send_msg(child_conn):
msg = "👋 你好,接收者!这是通过Pipe传的消息"
print(f"📤 发送者发送:{msg}")
child_conn.send(msg)
child_conn.close()
# 接收者进程
def recv_msg(parent_conn):
msg = parent_conn.recv()
print(f"📥 接收者收到:{msg}")
parent_conn.close()
if __name__ == '__main__':
# 创建双向管道,返回两个连接对象(parent给父进程,child给子进程)
parent_conn, child_conn = Pipe()
p = Process(target=send_msg, args=(child_conn,))
p.start()
recv_msg(parent_conn)
p.join()
6. 现代Python高级封装:concurrent.futures.ProcessPoolExecutor
Python 3.2引入的concurrent.futures模块,把多进程和多线程的API统一了,代码更简洁,还支持map批量提交、异常捕获、超时控制等高级功能。
同样推荐用上下文管理器自动释放资源:
from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def simple_task(task_id: int) -> str:
start = time.time()
print(f"🛠️ 任务{task_id} | 进程:{os.getpid()}")
time.sleep(random.uniform(0.2, 1.2))
cost = round(time.time() - start, 2)
return f"任务{task_id}耗时{cost}s"
if __name__ == '__main__':
print(f"👨💼 主进程 | PID:{os.getpid()}")
with ProcessPoolExecutor(max_workers=3) as executor:
# 方式1:用map批量提交(顺序返回结果,哪怕后面的任务先完成)
print("\n📋 用map批量提交:")
for res in executor.map(simple_task, range(3)):
print(f"📦 顺序结果:{res}")
# 方式2:用submit+as_completed(谁先完成返回谁)
print("\n🚀 用submit+as_completed:")
futures = [executor.submit(simple_task, i+3) for i in range(3)]
for future in futures:
print(f"📦 完成结果:{future.result()}")
7. 最佳实践与避坑指南
7.1 避坑1:必须加if __name__ == '__main__'
Windows系统用spawn方式启动子进程时,会重新导入当前模块作为子进程的入口,如果不加这行,子进程会再次执行所有顶层代码,导致无限创建进程报错。
7.2 避坑2:GIL的影响
Python解释器的GIL会在同一时刻只允许一个线程执行Python字节码,所以多线程无法利用多核CPU做并行计算,但可以做IO密集型任务(IO时GIL会释放)。
- 适合多进程:CPU密集型任务(比如数据压缩、图像处理、数学计算)
- 适合多线程/异步IO:IO密集型任务(比如网络请求、文件读写)
7.3 避坑3:资源共享
尽量不要共享状态,用Queue/Pipe传递消息(避免死锁和数据竞争)。如果必须共享少量数据,用multiprocessing.Value或Array:
from multiprocessing import Process, Value, Array
def add_num(num: Value) -> None:
for _ in range(10000):
# 必须加锁修改共享变量!
with num.get_lock():
num.value += 1
if __name__ == '__main__':
# 创建共享的int变量,初始值0
shared_num = Value('i', 0)
p1 = Process(target=add_num, args=(shared_num,))
p2 = Process(target=add_num, args=(shared_num,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"🔢 共享变量最终值:{shared_num.value}") # 应该是20000
7.4 避坑4:子进程异常不会自动传播
如果子进程抛出异常,父进程默认看不到,只会在join()后看到exitcode非0。用ProcessPoolExecutor的future.result()或者Pool的get()可以捕获子进程的异常。
8. 快速总结
对于现代Python 3.5+开发,优先推荐:
- 批量任务 →
ProcessPoolExecutor
- 精细控制单个子进程 →
Process
- 调用外部命令 →
subprocess.run()/Popen
合理使用多进程,就能充分发挥多核CPU的潜力,让你的程序飞起来!🚀