Python多进程编程指南

1. 进程基础

1.1 为什么要用多进程?

你有没有遇到过跑一个Python脚本,电脑CPU核心只用了10%不到的情况?或者处理大量数据时,单线程卡在IO间隙之外,还是慢悠悠的?这时候多进程就是救星——

进程是操作系统资源分配和调度的基本单位,每个进程都有独立的内存空间、文件描述符等,互不干扰(不像多线程共享全局变量容易踩坑),能真正利用多核CPU的并行能力。

Python为了规避解释器的GIL(全局解释器锁,这个后面提注意事项时会展开)对CPU密集型任务的限制,专门做了完善的多进程支持,常见的实现方式有:

  • os.fork()(仅限Unix/Linux/macOS,底层但不跨平台)
  • multiprocessing 模块(Python官方跨平台方案,支持精细控制)
  • concurrent.futures.ProcessPoolExecutor(Python 3.2+ 高级封装,代码更简洁)
  • subprocess 模块(主要用于调用外部命令进程)

2. 仅限Unix-like的底层创建:os.fork()

Unix/Linux/macOS内核提供了fork()系统调用,它会几乎完全复制当前父进程的副本(子进程),区别仅在返回值:

  • 父进程中返回子进程的PID(进程ID)
  • 子进程中返回0

来看个极简示例:

import os

print(f"父进程启动!PID:{os.getpid()}")
# 这里是关键分叉点
pid = os.fork()  

if pid == 0:
    # 子进程逻辑
    print(f"我是子进程,PID:{os.getpid()},我的父进程是:{os.getppid()}")
else:
    # 父进程逻辑
    print(f"我是父进程,PID:{os.getpid()},刚创建了子进程:{pid}")

⚠️ 必须注意的限制

  1. Windows系统完全不支持fork(),跑这段代码会报错
  2. 复制内存空间的开销很大(虽然有写时复制机制,但还是不如spawn安全通用)
  3. 子进程会继承父进程的所有资源状态,但修改不会同步回去

3. 官方跨平台首选:multiprocessing模块

为了统一不同平台的多进程实现,Python提供了模拟多线程API的multiprocessing,完全兼容所有主流操作系统。

3.1 单个子进程控制:Process

就像用threading.Thread创建线程一样,Process类用target指定子进程要跑的函数,args/kwargs传参,start()启动,join()等待结束。

from multiprocessing import Process
import os

# 子进程要执行的任务
def print_child_info(name: str) -> None:
    print(f"🚀 子进程启动 | 名称:{name} | PID:{os.getpid()}")

if __name__ == '__main__':
    # ⚠️ 必须加这行!Windows会解释成模块重新导入并执行所有顶层代码
    print(f"👨‍💼 父进程启动 | PID:{os.getpid()}")
    # 创建Process实例
    p = Process(target=print_child_info, args=("测试子进程",))
    print("⏳ 准备启动子进程...")
    p.start()  # 真正启动子进程
    p.join()   # 阻塞父进程,直到子进程结束再继续
    print("✅ 子进程已结束")

3.2 批量子进程管理:Pool进程池

如果需要同时跑几十个、上百个小任务,不要一个个创建Process——进程创建和销毁的开销不小,用进程池可以复用已创建的进程,效率提升很多。

推荐用Python 3.3+支持的上下文管理器with Pool(...),自动处理资源释放:

from multiprocessing import Pool
import os
import time
import random

# 模拟耗时的CPU密集型/IO密集型任务
def simulate_task(task_id: int) -> str:
    start_time = time.time()
    print(f"🛠️  任务{task_id}开始 | 处理进程:{os.getpid()}")
    # 随机休眠0-3秒,模拟耗时
    time.sleep(random.uniform(0, 3))
    cost_time = round(time.time() - start_time, 2)
    print(f"✅ 任务{task_id}完成 | 耗时:{cost_time}s")
    return f"任务{task_id}的结果"

if __name__ == '__main__':
    print(f"👨‍💼 父进程启动 | PID:{os.getpid()}")
    # 创建最大容量为4的进程池(默认等于CPU核心数)
    with Pool(processes=4) as pool:
        # 用apply_async异步提交5个任务(非阻塞)
        task_results = [pool.apply_async(simulate_task, args=(i,)) for i in range(5)]
        print("⏳ 所有任务已提交,等待完成...")
        # 获取所有任务的结果(如果某个任务没完成,会阻塞在这里)
        for res in task_results:
            print(f"📦 拿到:{res.get()}")
    print("🏁 所有任务和进程池已清理完毕")

📌 Pool的两个关键方法

  • apply_async(func, args)非阻塞异步提交,适合批量任务
  • apply(func, args)阻塞同步提交,等当前任务完成才提交下一个,几乎用不上

4. 外部命令进程调用:subprocess模块

如果你的任务不是写Python函数,而是调用Shell脚本、系统命令、其他语言程序,就用subprocess模块,它完全替代了旧版的os.systemos.popen,更安全可控。

4.1 简单调用(获取返回值和输出)

推荐用Python 3.5+的subprocess.run(),支持捕获标准输出/错误、设置超时等:

import subprocess

# 调用ls -l,捕获输出并转为文本(text=True)
result = subprocess.run(
    ["ls", "-l"],  # 参数必须是列表(避免Shell注入风险)
    capture_output=True,
    text=True,
    check=True  # 如果命令执行失败(返回码非0),抛出异常
)
print("📁 当前目录内容:")
print(result.stdout)

4.2 与外部进程交互

如果需要给外部进程发输入、收实时输出,用Popen类(配合上下文管理器):

import subprocess

# 启动Python交互式解释器,发一段简单代码
with subprocess.Popen(
    ["python3", "-i"],
    stdin=subprocess.PIPE,  # 可以给子进程发输入
    stdout=subprocess.PIPE, # 可以收子进程的输出
    stderr=subprocess.PIPE, # 可以收子进程的错误
    text=True,
    bufsize=1,
    universal_newlines=True
) as proc:
    # 发送命令并换行
    proc.stdin.write('print("👋 来自子进程的问候!")\n')
    proc.stdin.write('exit()\n')
    # 读取所有输出和错误
    out, err = proc.communicate()
    print("📤 子进程输出:")
    print(out.strip())

5. 进程间通信(IPC):Queue与Pipe

因为进程有独立的内存空间,不能像多线程那样直接共享全局变量,必须用专门的IPC机制传递数据。multiprocessing内置了两种常用的:

5.1 多生产者-多消费者:Queue

队列是最常用的IPC方式,线程安全、进程安全,支持多个进程往里面写、多个进程往外读。

from multiprocessing import Process, Queue
import os
import time
import random

# 生产者进程:往队列里塞数据
def producer(q: Queue) -> None:
    print(f"🏭 生产者启动 | PID:{os.getpid()}")
    for item in ["苹果", "香蕉", "橙子"]:
        print(f"📤 生产者放入:{item}")
        q.put(item)
        time.sleep(random.uniform(0.5, 1.5))

# 消费者进程:从队列里取数据
def consumer(q: Queue) -> None:
    print(f"🍽️  消费者启动 | PID:{os.getpid()}")
    while True:
        # get(True):阻塞直到队列有数据
        item = q.get(True)
        if item == "END":
            print("🍽️  收到结束信号,消费者退出")
            break
        print(f"📥 消费者取出:{item}")

if __name__ == '__main__':
    q = Queue()
    # 创建生产者和消费者
    p_prod = Process(target=producer, args=(q,))
    p_cons = Process(target=consumer, args=(q,))
    # 启动进程
    p_prod.start()
    p_cons.start()
    # 等待生产者完成
    p_prod.join()
    # 往队列里塞结束信号(有几个消费者就塞几个)
    q.put("END")
    p_cons.join()

5.2 一对一通信:Pipe

Pipe是双向(默认)或单向的管道,适合两个进程之间快速传递数据。

from multiprocessing import Process, Pipe

# 发送者进程
def send_msg(child_conn):
    msg = "👋 你好,接收者!这是通过Pipe传的消息"
    print(f"📤 发送者发送:{msg}")
    child_conn.send(msg)
    child_conn.close()

# 接收者进程
def recv_msg(parent_conn):
    msg = parent_conn.recv()
    print(f"📥 接收者收到:{msg}")
    parent_conn.close()

if __name__ == '__main__':
    # 创建双向管道,返回两个连接对象(parent给父进程,child给子进程)
    parent_conn, child_conn = Pipe()
    p = Process(target=send_msg, args=(child_conn,))
    p.start()
    recv_msg(parent_conn)
    p.join()

6. 现代Python高级封装:concurrent.futures.ProcessPoolExecutor

Python 3.2引入的concurrent.futures模块,把多进程和多线程的API统一了,代码更简洁,还支持map批量提交、异常捕获、超时控制等高级功能。

同样推荐用上下文管理器自动释放资源:

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def simple_task(task_id: int) -> str:
    start = time.time()
    print(f"🛠️  任务{task_id} | 进程:{os.getpid()}")
    time.sleep(random.uniform(0.2, 1.2))
    cost = round(time.time() - start, 2)
    return f"任务{task_id}耗时{cost}s"

if __name__ == '__main__':
    print(f"👨‍💼 主进程 | PID:{os.getpid()}")
    with ProcessPoolExecutor(max_workers=3) as executor:
        # 方式1:用map批量提交(顺序返回结果,哪怕后面的任务先完成)
        print("\n📋 用map批量提交:")
        for res in executor.map(simple_task, range(3)):
            print(f"📦 顺序结果:{res}")
        
        # 方式2:用submit+as_completed(谁先完成返回谁)
        print("\n🚀 用submit+as_completed:")
        futures = [executor.submit(simple_task, i+3) for i in range(3)]
        for future in futures:
            print(f"📦 完成结果:{future.result()}")

7. 最佳实践与避坑指南

7.1 避坑1:必须加if __name__ == '__main__'

Windows系统用spawn方式启动子进程时,会重新导入当前模块作为子进程的入口,如果不加这行,子进程会再次执行所有顶层代码,导致无限创建进程报错。

7.2 避坑2:GIL的影响

Python解释器的GIL会在同一时刻只允许一个线程执行Python字节码,所以多线程无法利用多核CPU做并行计算,但可以做IO密集型任务(IO时GIL会释放)。

  • 适合多进程:CPU密集型任务(比如数据压缩、图像处理、数学计算)
  • 适合多线程/异步IO:IO密集型任务(比如网络请求、文件读写)

7.3 避坑3:资源共享

尽量不要共享状态,用Queue/Pipe传递消息(避免死锁和数据竞争)。如果必须共享少量数据,用multiprocessing.ValueArray

from multiprocessing import Process, Value, Array

def add_num(num: Value) -> None:
    for _ in range(10000):
        # 必须加锁修改共享变量!
        with num.get_lock():
            num.value += 1

if __name__ == '__main__':
    # 创建共享的int变量,初始值0
    shared_num = Value('i', 0)
    p1 = Process(target=add_num, args=(shared_num,))
    p2 = Process(target=add_num, args=(shared_num,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"🔢 共享变量最终值:{shared_num.value}")  # 应该是20000

7.4 避坑4:子进程异常不会自动传播

如果子进程抛出异常,父进程默认看不到,只会在join()后看到exitcode非0。用ProcessPoolExecutorfuture.result()或者Poolget()可以捕获子进程的异常。


8. 快速总结

实现方式适用场景优点缺点
os.fork()Unix-like底层实验简单不跨平台、不安全
multiprocessing.Process需要精细控制单个子进程跨平台、功能全批量任务管理麻烦
multiprocessing.Pool批量CPU密集型任务跨平台、进程复用API不如高级封装简洁
concurrent.futures.ProcessPoolExecutor现代Python批量任务API统一、代码简洁、支持高级功能精细控制不如Process
subprocess调用外部命令/程序安全可控、功能全不适合Python内部函数任务

对于现代Python 3.5+开发,优先推荐:

  1. 批量任务 → ProcessPoolExecutor
  2. 精细控制单个子进程 → Process
  3. 调用外部命令 → subprocess.run()/Popen

合理使用多进程,就能充分发挥多核CPU的潜力,让你的程序飞起来!🚀