Python 多进程编程指南
1. 进程基础
1.1 为什么要用多进程?
你有没有遇到过这种情况:跑一个 Python 脚本处理大量数据,电脑 CPU 只用了不到 10%?或者单线程卡在 I/O 间隙,慢悠悠的?这时候就可以试试多进程。
进程是操作系统资源分配和调度的基本单位,每个进程都有自己独立的内存空间、文件描述符等,互不干扰——不像多线程共享全局变量那样容易踩坑。多进程能真正利用多核 CPU 的并行能力。
Python 为了规避解释器的 GIL(全局解释器锁,后面会解释)对 CPU 密集型任务的限制,专门提供了完善的多进程支持。常用实现方式有:
os.fork() —— 仅限 Unix/Linux/macOS,底层但不跨平台
multiprocessing 模块 —— Python 官方跨平台方案,控制精细
concurrent.futures.ProcessPoolExecutor —— Python 3.2+ 高级封装,代码更简洁
subprocess 模块 —— 主要用于调用外部命令进程
2. 仅限 Unix-like 的底层创建:os.fork()
Unix/Linux/macOS 提供了 fork() 系统调用,它会几乎完整复制当前父进程的副本(子进程),唯一的区别在返回值:
- 父进程中返回子进程的 PID(进程 ID)
- 子进程中返回 0
一个极简示例:
import os
print(f"父进程启动!PID:{os.getpid()}")
# 关键分叉点
pid = os.fork()
if pid == 0:
# 子进程逻辑
print(f"我是子进程,PID:{os.getpid()},我的父进程是:{os.getppid()}")
else:
# 父进程逻辑
print(f"我是父进程,PID:{os.getpid()},刚创建了子进程:{pid}")
⚠️ 注意几个硬限制:
- Windows 完全不支持
fork(),运行就会报错
- 复制内存空间的开销很大(虽有写时复制机制,但不如
spawn 安全通用)
- 子进程会继承父进程的所有资源状态,但之后的修改不会同步回父进程
3. 官方跨平台首选:multiprocessing 模块
为了统一不同平台的多进程实现,Python 提供了 multiprocessing 模块,完全兼容所有主流系统,API 风格也接近多线程。
3.1 单个子进程控制:Process 类
就像用 threading.Thread 创建线程那样,用 Process 类指定子进程要运行的函数,args/kwargs 传参,start() 启动,join() 等待结束。
from multiprocessing import Process
import os
def print_child_info(name: str) -> None:
print(f"🚀 子进程启动 | 名称:{name} | PID:{os.getpid()}")
if __name__ == '__main__':
# ⚠️ Windows 上必须加这行!否则子进程会重新导入模块并执行所有顶层代码
print(f"👨💼 父进程启动 | PID:{os.getpid()}")
p = Process(target=print_child_info, args=("测试子进程",))
print("⏳ 准备启动子进程...")
p.start() # 真正启动子进程
p.join() # 阻塞父进程,直到子进程结束
print("✅ 子进程已结束")
3.2 批量管理:Pool 进程池
如果需要同时跑几十上百个小任务,不要一个个创建 Process——进程创建和销毁的开销不小。用进程池可以复用已创建的进程,效率提高很多。
推荐使用 上下文管理器 with Pool(...)(Python 3.3+),自动释放资源:
from multiprocessing import Pool
import os
import time
import random
def simulate_task(task_id: int) -> str:
start = time.time()
print(f"🛠️ 任务{task_id}开始 | 处理进程:{os.getpid()}")
time.sleep(random.uniform(0, 3)) # 模拟耗时
cost = round(time.time() - start, 2)
print(f"✅ 任务{task_id}完成 | 耗时:{cost}s")
return f"任务{task_id}的结果"
if __name__ == '__main__':
print(f"👨💼 父进程启动 | PID:{os.getpid()}")
with Pool(processes=4) as pool: # 最大容量 4,默认等于 CPU 核心数
# 用 apply_async 异步提交 5 个任务(非阻塞)
task_results = [pool.apply_async(simulate_task, args=(i,)) for i in range(5)]
print("⏳ 所有任务已提交,等待完成...")
# 获取所有任务的结果(若某任务未完成,会阻塞在这里)
for res in task_results:
print(f"📦 拿到:{res.get()}")
print("🏁 所有任务和进程池已清理完毕")
📌 Pool 的两个核心方法:
apply_async(func, args):非阻塞异步提交,适合批量任务
apply(func, args):阻塞同步提交,等当前任务完成才提交下一个,几乎用不上
4. 外部命令进程调用:subprocess 模块
如果任务不是写 Python 函数,而是调用 Shell 脚本、系统命令、其他语言程序,就用 subprocess 模块。它完全替代了老旧的 os.system、os.popen,更安全可控。
4.1 简单调用(获取返回值和输出)
推荐使用 Python 3.5+ 的 subprocess.run(),支持捕获输出、设置超时等:
import subprocess
# 调用 ls -l,捕获输出并转为文本
result = subprocess.run(
["ls", "-l"], # 参数必须用列表,避免 Shell 注入风险
capture_output=True,
text=True,
check=True # 命令失败(返回码非 0)时抛出异常
)
print("📁 当前目录内容:")
print(result.stdout)
4.2 与外部进程交互
如果需要给外部进程发送输入、读取实时输出,可以使用 Popen 类(配合上下文管理器):
import subprocess
with subprocess.Popen(
["python3", "-i"],
stdin=subprocess.PIPE, # 允许向子进程发送输入
stdout=subprocess.PIPE, # 接收子进程的输出
stderr=subprocess.PIPE, # 接收子进程的错误输出
text=True,
bufsize=1,
universal_newlines=True
) as proc:
# 发送命令并换行
proc.stdin.write('print("👋 来自子进程的问候!")\n')
proc.stdin.write('exit()\n')
# 读取所有输出和错误
out, err = proc.communicate()
print("📤 子进程输出:")
print(out.strip())
5. 进程间通信(IPC):Queue 与 Pipe
因为进程有独立的内存空间,不能像多线程那样直接共享全局变量,所以必须用专门的 IPC 机制传递数据。multiprocessing 内置了两种常用方式:
5.1 多生产者‑多消费者:Queue
队列是最常用的 IPC 方式,线程安全、进程安全,允许多个进程写入,多个进程读出。
from multiprocessing import Process, Queue
import os
import time
import random
def producer(q: Queue) -> None:
print(f"🏭 生产者启动 | PID:{os.getpid()}")
for item in ["苹果", "香蕉", "橙子"]:
print(f"📤 生产者放入:{item}")
q.put(item)
time.sleep(random.uniform(0.5, 1.5))
def consumer(q: Queue) -> None:
print(f"🍽️ 消费者启动 | PID:{os.getpid()}")
while True:
item = q.get(True) # 阻塞直到队列有数据
if item == "END":
print("🍽️ 收到结束信号,消费者退出")
break
print(f"📥 消费者取出:{item}")
if __name__ == '__main__':
q = Queue()
p_prod = Process(target=producer, args=(q,))
p_cons = Process(target=consumer, args=(q,))
p_prod.start()
p_cons.start()
p_prod.join()
# 往队列塞结束信号(有几个消费者就塞几个)
q.put("END")
p_cons.join()
5.2 一对一通信:Pipe
Pipe 是双向(默认)或单向的管道,特别适合两个进程之间快速传递数据。
from multiprocessing import Process, Pipe
def send_msg(child_conn):
msg = "👋 你好,接收者!这是通过 Pipe 传的消息"
print(f"📤 发送者发送:{msg}")
child_conn.send(msg)
child_conn.close()
def recv_msg(parent_conn):
msg = parent_conn.recv()
print(f"📥 接收者收到:{msg}")
parent_conn.close()
if __name__ == '__main__':
# 创建双向管道,返回两个连接对象(父进程用 parent,子进程用 child)
parent_conn, child_conn = Pipe()
p = Process(target=send_msg, args=(child_conn,))
p.start()
recv_msg(parent_conn)
p.join()
6. 现代 Python 高级封装:concurrent.futures.ProcessPoolExecutor
Python 3.2 引入的 concurrent.futures 模块,统一了多进程和多线程的 API,代码更简洁,还支持 map 批量提交、异常捕获、超时控制等高级功能。
同样推荐用上下文管理器自动释放资源:
from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def simple_task(task_id: int) -> str:
start = time.time()
print(f"🛠️ 任务{task_id} | 进程:{os.getpid()}")
time.sleep(random.uniform(0.2, 1.2))
cost = round(time.time() - start, 2)
return f"任务{task_id}耗时{cost}s"
if __name__ == '__main__':
print(f"👨💼 主进程 | PID:{os.getpid()}")
with ProcessPoolExecutor(max_workers=3) as executor:
# 方式1:用 map 批量提交(按顺序返回结果)
print("\n📋 用 map 批量提交:")
for res in executor.map(simple_task, range(3)):
print(f"📦 顺序结果:{res}")
# 方式2:用 submit + as_completed(谁先完成就返回谁)
print("\n🚀 用 submit + as_completed:")
futures = [executor.submit(simple_task, i+3) for i in range(3)]
for future in futures:
print(f"📦 完成结果:{future.result()}")
7. 最佳实践与避坑指南
7.1 避坑 1:必须加 if __name__ == '__main__'
Windows 使用 spawn 方式启动子进程时,会重新导入当前模块作为子进程的入口。如果不加这行,子进程会再次执行所有顶层代码,导致无限创建进程、报错。在所有多进程代码中都加上这句是最安全的做法。
7.2 避坑 2:GIL 的影响
Python 解释器的 GIL(全局解释器锁)保证同一时刻只有一个线程执行 Python 字节码,因此多线程无法利用多核 CPU 做真正的并行计算,但可以胜任 I/O 密集型任务(因为 I/O 操作时 GIL 会释放)。
- 适合多进程:CPU 密集型任务(数据压缩、图像处理、数学计算等)
- 适合多线程/异步 I/O:I/O 密集型任务(网络请求、文件读写等)
7.3 避坑 3:资源共享
尽量不要直接共享状态,用 Queue / Pipe 传递消息,避免死锁和数据竞争。如果必须共享少量数据,可用 multiprocessing.Value 或 Array,且必须加锁保护:
from multiprocessing import Process, Value
def add_num(num: Value) -> None:
for _ in range(10000):
with num.get_lock(): # 加锁修改共享变量
num.value += 1
if __name__ == '__main__':
shared_num = Value('i', 0) # 创建共享的 int,初始为 0
p1 = Process(target=add_num, args=(shared_num,))
p2 = Process(target=add_num, args=(shared_num,))
p1.start()
p2.start()
p1.join()
p2.join()
print(f"🔢 共享变量最终值:{shared_num.value}") # 应为 20000
7.4 避坑 4:子进程异常不会自动传播
如果子进程内部抛出异常,父进程默认是看不到的,只会在 join() 后看到非零的 exitcode。
使用 ProcessPoolExecutor 的 future.result() 或 Pool 的 get() 方法,可以捕获子进程的异常,让错误显式暴露出来。
8. 快速总结
现代 Python(≥3.5)推荐选择:
- 批量任务 →
ProcessPoolExecutor
- 精细控制单个子进程 →
Process
- 调用外部命令 →
subprocess.run() / Popen
合理使用多进程,就能充分发挥多核 CPU 的潜力,让你的程序飞起来!🚀