Python 多进程编程指南

1. 进程基础

1.1 为什么要用多进程?

你有没有遇到过这种情况:跑一个 Python 脚本处理大量数据,电脑 CPU 只用了不到 10%?或者单线程卡在 I/O 间隙,慢悠悠的?这时候就可以试试多进程。

进程是操作系统资源分配和调度的基本单位,每个进程都有自己独立的内存空间、文件描述符等,互不干扰——不像多线程共享全局变量那样容易踩坑。多进程能真正利用多核 CPU 的并行能力。

Python 为了规避解释器的 GIL(全局解释器锁,后面会解释)对 CPU 密集型任务的限制,专门提供了完善的多进程支持。常用实现方式有:

  • os.fork() —— 仅限 Unix/Linux/macOS,底层但不跨平台
  • multiprocessing 模块 —— Python 官方跨平台方案,控制精细
  • concurrent.futures.ProcessPoolExecutor —— Python 3.2+ 高级封装,代码更简洁
  • subprocess 模块 —— 主要用于调用外部命令进程

2. 仅限 Unix-like 的底层创建:os.fork()

Unix/Linux/macOS 提供了 fork() 系统调用,它会几乎完整复制当前父进程的副本(子进程),唯一的区别在返回值:

  • 父进程中返回子进程的 PID(进程 ID)
  • 子进程中返回 0

一个极简示例:

import os

print(f"父进程启动!PID:{os.getpid()}")
# 关键分叉点
pid = os.fork()  

if pid == 0:
    # 子进程逻辑
    print(f"我是子进程,PID:{os.getpid()},我的父进程是:{os.getppid()}")
else:
    # 父进程逻辑
    print(f"我是父进程,PID:{os.getpid()},刚创建了子进程:{pid}")

⚠️ 注意几个硬限制

  1. Windows 完全不支持 fork(),运行就会报错
  2. 复制内存空间的开销很大(虽有写时复制机制,但不如 spawn 安全通用)
  3. 子进程会继承父进程的所有资源状态,但之后的修改不会同步回父进程

3. 官方跨平台首选:multiprocessing 模块

为了统一不同平台的多进程实现,Python 提供了 multiprocessing 模块,完全兼容所有主流系统,API 风格也接近多线程。

3.1 单个子进程控制:Process

就像用 threading.Thread 创建线程那样,用 Process 类指定子进程要运行的函数,args/kwargs 传参,start() 启动,join() 等待结束。

from multiprocessing import Process
import os

def print_child_info(name: str) -> None:
    print(f"🚀 子进程启动 | 名称:{name} | PID:{os.getpid()}")

if __name__ == '__main__':
    # ⚠️ Windows 上必须加这行!否则子进程会重新导入模块并执行所有顶层代码
    print(f"👨‍💼 父进程启动 | PID:{os.getpid()}")
    p = Process(target=print_child_info, args=("测试子进程",))
    print("⏳ 准备启动子进程...")
    p.start()  # 真正启动子进程
    p.join()   # 阻塞父进程,直到子进程结束
    print("✅ 子进程已结束")

3.2 批量管理:Pool 进程池

如果需要同时跑几十上百个小任务,不要一个个创建 Process——进程创建和销毁的开销不小。用进程池可以复用已创建的进程,效率提高很多。

推荐使用 上下文管理器 with Pool(...)(Python 3.3+),自动释放资源:

from multiprocessing import Pool
import os
import time
import random

def simulate_task(task_id: int) -> str:
    start = time.time()
    print(f"🛠️  任务{task_id}开始 | 处理进程:{os.getpid()}")
    time.sleep(random.uniform(0, 3))   # 模拟耗时
    cost = round(time.time() - start, 2)
    print(f"✅ 任务{task_id}完成 | 耗时:{cost}s")
    return f"任务{task_id}的结果"

if __name__ == '__main__':
    print(f"👨‍💼 父进程启动 | PID:{os.getpid()}")
    with Pool(processes=4) as pool:   # 最大容量 4,默认等于 CPU 核心数
        # 用 apply_async 异步提交 5 个任务(非阻塞)
        task_results = [pool.apply_async(simulate_task, args=(i,)) for i in range(5)]
        print("⏳ 所有任务已提交,等待完成...")
        # 获取所有任务的结果(若某任务未完成,会阻塞在这里)
        for res in task_results:
            print(f"📦 拿到:{res.get()}")
    print("🏁 所有任务和进程池已清理完毕")

📌 Pool 的两个核心方法

  • apply_async(func, args)非阻塞异步提交,适合批量任务
  • apply(func, args)阻塞同步提交,等当前任务完成才提交下一个,几乎用不上

4. 外部命令进程调用:subprocess 模块

如果任务不是写 Python 函数,而是调用 Shell 脚本、系统命令、其他语言程序,就用 subprocess 模块。它完全替代了老旧的 os.systemos.popen,更安全可控。

4.1 简单调用(获取返回值和输出)

推荐使用 Python 3.5+ 的 subprocess.run(),支持捕获输出、设置超时等:

import subprocess

# 调用 ls -l,捕获输出并转为文本
result = subprocess.run(
    ["ls", "-l"],          # 参数必须用列表,避免 Shell 注入风险
    capture_output=True,
    text=True,
    check=True             # 命令失败(返回码非 0)时抛出异常
)
print("📁 当前目录内容:")
print(result.stdout)

4.2 与外部进程交互

如果需要给外部进程发送输入、读取实时输出,可以使用 Popen 类(配合上下文管理器):

import subprocess

with subprocess.Popen(
    ["python3", "-i"],
    stdin=subprocess.PIPE,   # 允许向子进程发送输入
    stdout=subprocess.PIPE,  # 接收子进程的输出
    stderr=subprocess.PIPE,  # 接收子进程的错误输出
    text=True,
    bufsize=1,
    universal_newlines=True
) as proc:
    # 发送命令并换行
    proc.stdin.write('print("👋 来自子进程的问候!")\n')
    proc.stdin.write('exit()\n')
    # 读取所有输出和错误
    out, err = proc.communicate()
    print("📤 子进程输出:")
    print(out.strip())

5. 进程间通信(IPC):Queue 与 Pipe

因为进程有独立的内存空间,不能像多线程那样直接共享全局变量,所以必须用专门的 IPC 机制传递数据。multiprocessing 内置了两种常用方式:

5.1 多生产者‑多消费者:Queue

队列是最常用的 IPC 方式,线程安全、进程安全,允许多个进程写入,多个进程读出。

from multiprocessing import Process, Queue
import os
import time
import random

def producer(q: Queue) -> None:
    print(f"🏭 生产者启动 | PID:{os.getpid()}")
    for item in ["苹果", "香蕉", "橙子"]:
        print(f"📤 生产者放入:{item}")
        q.put(item)
        time.sleep(random.uniform(0.5, 1.5))

def consumer(q: Queue) -> None:
    print(f"🍽️  消费者启动 | PID:{os.getpid()}")
    while True:
        item = q.get(True)   # 阻塞直到队列有数据
        if item == "END":
            print("🍽️  收到结束信号,消费者退出")
            break
        print(f"📥 消费者取出:{item}")

if __name__ == '__main__':
    q = Queue()
    p_prod = Process(target=producer, args=(q,))
    p_cons = Process(target=consumer, args=(q,))
    p_prod.start()
    p_cons.start()
    p_prod.join()
    # 往队列塞结束信号(有几个消费者就塞几个)
    q.put("END")
    p_cons.join()

5.2 一对一通信:Pipe

Pipe 是双向(默认)或单向的管道,特别适合两个进程之间快速传递数据。

from multiprocessing import Process, Pipe

def send_msg(child_conn):
    msg = "👋 你好,接收者!这是通过 Pipe 传的消息"
    print(f"📤 发送者发送:{msg}")
    child_conn.send(msg)
    child_conn.close()

def recv_msg(parent_conn):
    msg = parent_conn.recv()
    print(f"📥 接收者收到:{msg}")
    parent_conn.close()

if __name__ == '__main__':
    # 创建双向管道,返回两个连接对象(父进程用 parent,子进程用 child)
    parent_conn, child_conn = Pipe()
    p = Process(target=send_msg, args=(child_conn,))
    p.start()
    recv_msg(parent_conn)
    p.join()

6. 现代 Python 高级封装:concurrent.futures.ProcessPoolExecutor

Python 3.2 引入的 concurrent.futures 模块,统一了多进程和多线程的 API,代码更简洁,还支持 map 批量提交、异常捕获、超时控制等高级功能。

同样推荐用上下文管理器自动释放资源:

from concurrent.futures import ProcessPoolExecutor
import os
import time
import random

def simple_task(task_id: int) -> str:
    start = time.time()
    print(f"🛠️  任务{task_id} | 进程:{os.getpid()}")
    time.sleep(random.uniform(0.2, 1.2))
    cost = round(time.time() - start, 2)
    return f"任务{task_id}耗时{cost}s"

if __name__ == '__main__':
    print(f"👨‍💼 主进程 | PID:{os.getpid()}")
    with ProcessPoolExecutor(max_workers=3) as executor:
        # 方式1:用 map 批量提交(按顺序返回结果)
        print("\n📋 用 map 批量提交:")
        for res in executor.map(simple_task, range(3)):
            print(f"📦 顺序结果:{res}")
        
        # 方式2:用 submit + as_completed(谁先完成就返回谁)
        print("\n🚀 用 submit + as_completed:")
        futures = [executor.submit(simple_task, i+3) for i in range(3)]
        for future in futures:
            print(f"📦 完成结果:{future.result()}")

7. 最佳实践与避坑指南

7.1 避坑 1:必须加 if __name__ == '__main__'

Windows 使用 spawn 方式启动子进程时,会重新导入当前模块作为子进程的入口。如果不加这行,子进程会再次执行所有顶层代码,导致无限创建进程、报错。在所有多进程代码中都加上这句是最安全的做法

7.2 避坑 2:GIL 的影响

Python 解释器的 GIL(全局解释器锁)保证同一时刻只有一个线程执行 Python 字节码,因此多线程无法利用多核 CPU 做真正的并行计算,但可以胜任 I/O 密集型任务(因为 I/O 操作时 GIL 会释放)。

  • 适合多进程:CPU 密集型任务(数据压缩、图像处理、数学计算等)
  • 适合多线程/异步 I/O:I/O 密集型任务(网络请求、文件读写等)

7.3 避坑 3:资源共享

尽量不要直接共享状态,用 Queue / Pipe 传递消息,避免死锁和数据竞争。如果必须共享少量数据,可用 multiprocessing.ValueArray,且必须加锁保护

from multiprocessing import Process, Value

def add_num(num: Value) -> None:
    for _ in range(10000):
        with num.get_lock():     # 加锁修改共享变量
            num.value += 1

if __name__ == '__main__':
    shared_num = Value('i', 0)   # 创建共享的 int,初始为 0
    p1 = Process(target=add_num, args=(shared_num,))
    p2 = Process(target=add_num, args=(shared_num,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print(f"🔢 共享变量最终值:{shared_num.value}")  # 应为 20000

7.4 避坑 4:子进程异常不会自动传播

如果子进程内部抛出异常,父进程默认是看不到的,只会在 join() 后看到非零的 exitcode
使用 ProcessPoolExecutorfuture.result()Poolget() 方法,可以捕获子进程的异常,让错误显式暴露出来。


8. 快速总结

实现方式适用场景优点缺点
os.fork()Unix-like 底层实验简单不跨平台、不安全
multiprocessing.Process精细控制单个子进程跨平台、功能全批量任务管理麻烦
multiprocessing.Pool批量 CPU 密集型任务跨平台、进程复用API 不如高级封装简洁
concurrent.futures.ProcessPoolExecutor现代 Python 批量任务API 统一、代码简洁、支持高级功能精细控制不如 Process
subprocess调用外部命令/程序安全可控、功能全不适合 Python 内部函数任务

现代 Python(≥3.5)推荐选择

  1. 批量任务ProcessPoolExecutor
  2. 精细控制单个子进程Process
  3. 调用外部命令subprocess.run() / Popen

合理使用多进程,就能充分发挥多核 CPU 的潜力,让你的程序飞起来!🚀