multiprocessing包详解【Python】
简介
multiprocessing
是 Python 标准库中的一个包,它支持进程间的并发执行,提供了一个与 threading 模块相似的API。通过使用 multiprocessing
,开发者可以在 Python 程序中创建新的进程,每个进程都拥有自己的 Python 解释器和内存空间,从而能够并行执行任务。这在进行CPU密集型任务时尤其有用,因为它可以绕过全局解释器锁(GIL),让多核或多CPU的系统可以被充分利用.
主要功能
- 进程管理
Process
类:用于表示一个进程对象。它允许你启动、停止、等待进程结束等。可以通过传递目标函数和参数来创建 Process 实例。
- 进程间通信(IPC)
Queue
:提供了一种安全的方式来在进程之间进行数据交换。队列内部使用锁来保证数据的一致性。Pipe
:提供了一种进程间双向通信的方法。管道可以是双向的(双工)或单向的(半双工)。
- 同步原语
Lock
:用于同步进程间的操作,确保一次只有一个进程可以访问共享资源。Event
:允许进程等待某些事件的发生。进程可以通过调用 set() 方法来触发事件。Semaphore
:用于限制对共享资源的访问,它初始化一个计数器,该计数器表示可以同时访问资源的进程数量。Condition
:允许一个或多个进程等待某个条件的发生,同时提供了通知机制。
- 进程池
Pool
类:允许创建一组进程池,可以将任务分配给池中的进程执行。支持同步和异步的任务执行方式,如 map、apply、apply_async 和 map_async。
- 共享状态
- 共享内存:
Value
和Array
类允许在不同进程之间共享数据。这些数据存储在共享内存中,可以由多个进程并发访问。 Manager
:提供了一种更灵活的方式来共享数据。它可以创建一个服务器进程,其他进程通过代理的方式访问服务器进程中的对象,如列表、字典等。
- 共享内存:
- 其他组件
cpu_count
:返回可用的 CPU 数量,这对于决定进程池的大小很有用。current_process
:返回当前进程的信息。active_children
:返回当前活跃子进程的列表。
功能详解
进程管理
Process
类
构造方法
- init(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group
:目前为保留参数,用于未来扩展,应始终为 None。target
:表示这个进程实例所调用对象,你可以传入一个可调用对象或函数,这个对象会在新进程启动时执行。name
:为进程指定一个名称,主要用于调试和跟踪。args
:是 target 调用时的位置参数,传递给函数的参数元组。kwargs
:是 target 调用时的关键字参数,传递给函数的字典。daemon
:如果设置为 True,则这个进程将被标记为守护进程,主程序结束时会自动杀死守护进程。默认值 None 表示继承父进程的守护标志。
实例方法
-
start()
- 启动进程。该方法将为目标函数创建一个新的进程,并在新进程中调用该函数。
-
join(timeout=None)
- 阻塞调用线程,直到进程的执行结束或达到指定的超时时间。如果未指定 timeout,则会一直等待直到进程结束。
-
is_alive()
- 返回进程是否还活着。一个活着的进程是启动而且尚未终止的进程。
-
terminate()
- 立即终止进程。注意,这种方法终止进程是不安全的,可能会导致进程资源未能被正确清理(如临时文件、锁等)。
-
kill()
- Python 3.7 中新增。作用同 terminate(),但使用更强的信号 SIGKILL,确保进程被终止。
-
close()
- Python 3.7 中新增。关闭 Process 对象,释放所有相关资源。如果进程尚未结束,则首先需要调用 join()
属性
-
exitcode
- 进程的退出代码。进程还在运行时为 None,正常退出时为 0,如果进程因信号而终止,则为负信号号码。
-
name
- 进程的名称。
-
daemon
- 获取或设置进程的守护标志。如果这个进程是守护进程,则返回 True。
-
pid
- 进程的 PID。如果进程还未启动,则为 None。
示例
from multiprocessing import Process
import timedef worker(name, sleep_time):print(f"Started worker {name}")time.sleep(sleep_time)print(f"Sleep time :{sleep_time}")if __name__ == "__main__":for i in range(3):process = Process(target=worker, args=(f'Bob-{i}',i), name=f'Worker-{i}')process.start() # 启动进程print(f"进程是否存活:{process.is_alive()}")print(time.time())>>>
进程是否存活:True
1734319313.823571
进程是否存活:True
1734319313.824584
进程是否存活:True
1734319313.825512
Started worker Bob-1
Started worker Bob-2
Started worker Bob-0
Sleep time :0
Sleep time :1
Sleep time :2
- 这个例子展示了如何创建和启动一个进程,以及如何等待进程完成。使用
Process
类的方法和属性可以让你更精确地控制进程的生命周期和行为 - 可以观察到程序并行运行,而如果使用
join()
方法,则各个进程则会被阻塞
进程间通信(IPC)
Queue
类
Queue
类是 multiprocessing
模块中提供的一个重要组件,用于实现进程间通信(IPC)。通过队列,不同的进程可以安全地交换信息或数据。
构造方法
Queue(maxsize=0)
- 初始化一个队列对象。maxsize 参数指定队列中允许的最大项数。如果 maxsize 小于或等于零,则队列大小为无限。
实例方法
-
put(item, block=True, timeout=None)
- 将 item 放入队列中。
- 如果 block 为 True(默认值),且 timeout 为 None,则在必要时阻塞,直到有空间可用。如果 timeout 是一个正数,它将最多阻塞 timeout 秒,然后抛出 queue.Full 异常(如果在这段时间内没有空间可用)。
- 如果 block 为 False,但队列已满,则立即抛出 queue.Full 异常。
-
get(block=True, timeout=None)
- 从队列中移除并返回一个项目。
- 如果 block 为 True(默认值),且 timeout 为 None,则在必要时阻塞,直到有项目可用。如果 timeout 是一个正数,它将最多阻塞 timeout 秒,然后抛出 queue.Empty 异常(如果在这段时间内没有项目可用)。
如果 block 为 False,但队列为空,则立即抛出 queue.Empty 异常。
-
empty()
- 返回 True 如果队列为空;否则,返回 False。注意,这个结果只是一个瞬间的快照,不保证后续的 put() 或 get() 调用会成功。
-
full()
- 返回 True 如果队列已满;否则,返回 False。与 empty() 方法类似,这个结果也只是一个瞬间的快照。
-
qsize()
- 返回队列中大约的项目数。注意,“大约”,是因为如果同时有其他线程或进程在修改队列,那么返回的结果可能不准确。
-
close()
- 关闭队列。这个方法在 Python 3.7 中被引入。
-
join_thread()
- 阻塞,直到处理队列的背景线程退出。这通常意味着队列已被彻底耗尽,且不再有任何未处理的项目。
-
cancel_join_thread()
- 阻止队列的加入线程。这通常用于在进程间队列的生产者进程已经知道消费者进程已经终止时,避免无限阻塞。
示例
from multiprocessing import Process, Queuedef worker(q, msg):q.put(msg)if __name__ == "__main__":msg = "Hello from parent process"q = Queue()p_1 = Process(target=worker, args=(q, f"{msg}-1"))p_2 = Process(target=worker, args=(q, f"{msg}-2"))p_1.start()p_2.start()p_1.join()p_2.join()while not q.empty():print(q.get()) >>>
Hello from parent process-1
Hello from parent process-2
注意事项
- 在使用 Queue 时,要注意在所有项目都被处理并且不再需要队列时,关闭和/或加入队列的线程,以避免资源泄漏。
- 在多进程环境下,对于队列的操作(put、get)可能需要考虑适当的超时时间,以避免死锁或阻塞过长时间。
Pipe
函数
Pipe 函数是 multiprocessing 模块中用于进程间通信的另一种主要机制。它创建了一对连接对象,默认情况下是双向的(双工),但也可以配置为单向(半双工)。通过管道,进程可以相互发送和接收 Python 对象。Pipe 通常用于两个进程间的通信。
Pipe() 函数
multiprocessing.Pipe(duplex=True)
- duplex:如果 duplex 是 True(默认值),则创建的管道是双向的。如果是 False,则创建的管道是单向的,即只能用于发送或接收。
Pipe() 函数返回一对 Connection 对象,代表管道的两端。
Connection
对象的方法
管道两端的 Connection 对象提供了一组用于通信的方法:
-
send(obj)
- 发送一个对象到管道的另一端。obj 参数可以是任何可pickle的对象。
-
recv()
- 从管道接收另一端发送过来的对象。此方法会阻塞,直到管道的另一端有数据发送过来。
-
fileno()
- 返回一个代表连接的文件描述符的整数。这个文件描述符可以用于底层I/O操作,例如将其整合到事件循环中。
-
close()
- 关闭连接。关闭后,连接不再可用。
-
poll(timeout=None)
- 检查管道的另一端是否发送了数据。如果 timeout 是 None(默认值),则立即返回结果。如果 timeout 是一个数字,则最多等待 timeout 秒。
- 返回 True 表示有数据可读取,False 表示没有。
- 检查管道的另一端是否发送了数据。如果 timeout 是 None(默认值),则立即返回结果。如果 timeout 是一个数字,则最多等待 timeout 秒。
示例
from multiprocessing import Process, Pipedef worker(conn):conn.send([42, None, 'hello from child'])conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p = Process(target=worker, args=(child_conn,))p.start()print(parent_conn.recv()) # 输出: [42, None, 'hello from child']parent_conn.close()p.join()>>>
[42, None, 'hello from child']
注意事项
- 使用管道时,务必注意关闭不再使用的连接端,以避免资源泄露或不必要的阻塞。
- 管道适用于少量进程间的通信。对于涉及多个进程的复杂场景,可能需要考虑其他通信机制,如
Queue
。 - 在双向管道中,如果两个进程同时尝试发送数据,而没有对应的接收操作,可能会导致死锁。设计通信协议时,应确保发送和接收操作的逻辑顺序能够预防死锁。
同步原语
Lock
类
Lock
类是 multiprocessing
模块中提供的一种简单的同步原语。它用于控制多个进程对共享资源的访问,以防止资源的并发访问导致数据错乱或损坏。Lock 类似于线程模块中的锁,但它是为进程间同步而设计的。
构造方法
Lock 类没有特定的构造参数,你可以直接创建一个 Lock 实例
from multiprocessing import Lock
lock = Lock()
实例方法
-
acquire(block=True, timeout=-1)
- 尝试获取锁。
- 如果 block 为 True(默认值),则调用将阻塞直到锁被释放,然后获取锁并返回 True。如果 block 为 False,则调用将不阻塞,如果无法立即- 获取锁,则返回 False。
- timeout 参数(仅当 block=True 时有效)允许指定阻塞的最大时间(秒)。timeout=-1(默认值)表示无限等待。如果指定了 timeout 并在超时期间锁未被释放,则返回 False。
-
release()
-
释放锁。只有在当前进程持有锁时才能调用此方法;否则,将抛出 RuntimeError。释放锁后,其他阻塞在 acquire() 调用中等待获取锁的进程将能够获取锁。
-
locked()
- 如果锁当前被持有,则返回 True;否则返回 False。注意,这个方法的结果仅仅是一个瞬时状态的反映,调用它的时候锁的状态可能已经改变。
示例
from multiprocessing import Process, Lock
import time# 定义一个简单的共享资源访问函数
def worker_with(lock, num):with lock:print(f"Worker {num} has acquired the lock")time.sleep(1)print(f"Worker {num} is releasing the lock")if __name__ == "__main__":lock = Lock()processes = [Process(target=worker_with, args=(lock, n)) for n in range(5)]for p in processes:p.start()for p in processes:p.join()print("Processing complete.")>>>
确保了在任何时刻只有一个进程能够访问共享资源,从而避免了并发访问的问题
注意事项
- 使用锁时应该小心,避免出现死锁情况。确保每次成功获取锁之后,都能够正确释放锁。
- 过度使用锁可能会导致程序性能下降,因为它限制了代码并发执行的能力。在设计程序时,应尽量减少锁的使用,只在绝对必要时使用锁来保护共享资源。
- 与所有同步原语一样,
Lock
应该谨慎使用,以避免复杂的同步问题。
Event
类
Event
类是 Python multiprocessing
模块中提供的一个同步原语,用于在进程之间通信和协调。它模拟了线程模块中的 Event 对象,允许一个进程向其他多个进程发出某个事件的发生,这对于在多个进程之间同步操作非常有用
构造方法
创建一个 Event
对象很简单,不需要任何参数
from multiprocessing import Event
event = Event()
实例方法
set()
- 将内部标志设置为 True。所有正在等待这个事件的进程将被唤醒。
clear()
- 将内部标志设置为 False。之后,调用 wait() 的进程将会被阻塞,直到 set() 被调用。
is_set()
- 返回 Event 的内部标志的当前状态。如果标志为 True,返回 True;否则返回 False。
wait(timeout=None)
- 阻塞调用进程,直到内部标志为 True。如果 timeout 为 None(默认值),则无限期等待。timeout 是一个数值,表示等待的最大秒数。
- 如果在 wait() 调用期间内部标志被设置为 True,或者在调用 wait() 时内部标志已经是 True,则返回 True;如果是因为超时而返回,则返回 False。
示例
from multiprocessing import Process, Event
import timedef worker(event, id):print(f"Worker {id} waiting for event.")event.wait() # 阻塞,直到事件被设置。print(f"Worker {id} received event.")if __name__ == "__main__":event = Event()# 创建并启动三个工作进程workers = [Process(target=worker, args=(event, i)) for i in range(3)]for w in workers:w.start()print("Main process doing some work.")time.sleep(2) # 模拟主进程工作一段时间event.set() # 设置事件,唤醒所有等待的进程print("Main process triggered the event.")for w in workers:w.join() # 等待所有工作进程完成print("Main process exiting.")>>>
Main process doing some work.
Worker 0 waiting for event.
Worker 1 waiting for event.
Worker 2 waiting for event.
Main process triggered the event.
Worker 0 received event.
Worker 1 received event.
Worker 2 received event.
Main process exiting.
注意事项
- 使用
Event
可以实现进程间的简单通信,但它不提供保护共享资源的机制。如果需要对共享资源进行访问控制,应考虑使用锁(如Lock
、RLock
)。 Event
对象适用于当一个进程必须等待一个或多个进程发出特定信号才能继续执行的场景。- 在使用
Event
进行同步时,要注意可能的死锁情况,特别是在复杂的进程间通信和协调场景下。
Semaphore
类
Semaphore 类是 multiprocessing 模块中提供的一种同步原语,用于控制对共享资源的访问数量。它可以被视为一个可用资源的计数器,是一种更为通用的同步机制,可以用来解决多个进程访问有限数量的资源问题。
构造方法
Semaphore
对象在创建时可以接受一个可选的整数值,用于指定信号量的初始值,即同时可以访问共享资源的进程数量。如果不指定,则默认值为1,此时它的行为类似于 Lock
。
from multiprocessing import Semaphoresem = Semaphore(value=3)
实例方法
-
acquire(block=True, timeout=None)
- 尝试减少信号量计数器的值(即获取资源)。如果计数器的值大于0,则减1并立即返回 True。如果计数器的值为0,则根据 block 参数的值行为会有所不同:
- 如果 block 为 True(默认值),调用将阻塞,直到其他进程释放资源(即调用 release()),计数器值变为大于0。
- 如果 block 为 False,则调用不会阻塞,立即返回 False。
- timeout 参数(仅当 block=True 时有效)允许指定阻塞的最长时间(秒)。如果在超时期间资源未被释放,则返回 False。
- 尝试减少信号量计数器的值(即获取资源)。如果计数器的值大于0,则减1并立即返回 True。如果计数器的值为0,则根据 block 参数的值行为会有所不同:
-
release()
- 释放资源,即将信号量计数器的值增加1。当计数器的值从0变为1时,正在等待这个信号量的其他进程将能够继续执行。
-
get_value()
(在某些Python版本中不推荐使用)- 返回信号量的当前值,即当前可用资源的数量。需要注意的是,因为进程间的状态可能会迅速变化,所以返回的值可能并不准确,主要用于调试目的。
示例
from multiprocessing import Process, Semaphore
import timesem = Semaphore(2) # 最多允许2个进程同时访问共享资源def worker(sem, num):with sem:print(f"Worker {num} is working.")time.sleep(num) # 模拟耗时操作print(f"Worker {num} finished.")if __name__ == "__main__":workers = [Process(target=worker, args=(sem, i)) for i in range(5)]for w in workers:w.start()for w in workers:w.join()print("All workers completed.")>>>
Worker 0 is working.
Worker 0 finished.
Worker 4 is working.
Worker 3 is working.
Worker 3 finished.
Worker 2 is working.
Worker 4 finished.
Worker 1 is working.
Worker 1 finished.
Worker 2 finished.
All workers completed.
注意事项
- 在使用
Semaphore
时应当注意,acquire
和release
方法调用需要成对出现,以避免信号量计数器的值被错误地修改,导致资源访问控制混乱。 Semaphore
可用于实现各种同步模式,包括限制对资源的并发访问、实现生产者消费者问题等。- 在设计多进程程序时,合理使用
Semaphore
可以有效地控制资源访问,防止资源竞争和冲突,但也需要注意避免死锁和其他同步问题。
Condition
类
Condition
类是 multiprocessing
模块中提供的一个同步原语,用于在进程之间等待某些条件的满足。它允许一个或多个进程等待某个条件变为真,而在条件满足时能够通知一个或多个等待的进程继续执行。Condition
常常与共享资源或状态变更相关的场景一起使用,提供了一种更加灵活的进程同步机制
构造方法
在创建 Condition 对象时,可以选择传递一个 Lock 或 RLock 对象用于内部使用。如果不传递,则 Condition 对象会自动创建一个新的 RLock 对象
from multiprocessing import Conditioncond = Condition()
实例方法
-
acquire(*args, **kwargs)
- 获得与条件关联的锁。这个方法直接调用内部锁对象的 acquire 方法,并且接受相同的参数。通常在调用 wait()、notify() 或 notify_all() 之前需要先调用 acquire() 获得锁。
-
release()
- 释放与条件关联的锁。这个方法直接调用内部锁对象的 release 方法。一般在调用 wait() 后,当条件满足继续执行后,需要释放锁。
-
wait(timeout=None)
- 让当前进程等待直到被通知或超时。调用这个方法时,进程必须已经获得了与条件关联的锁。方法会自动释放锁,然后进程被阻塞,直到其他进程在同一个条件变量上调用 notify() 或 notify_all()。一旦被通知,wait 方法重新获得锁然后返回。timeout 指定等待的最长时间,如果未指定,则无限等待。
-
notify(n=1)
- 通知等待这个条件的进程,使其继续执行。调用这个方法时,进程必须已经获得了与条件关联的锁。n 指定要唤醒的等待进程的数量,默认为1。
-
notify_all()
或notifyAll()
- 通知等待这个条件的所有进程,使其继续执行。调用这个方法时,进程必须已经获得了与条件关联的锁。
示例
from multiprocessing import Process, Condition, Array
import timedef producer(cond, shared_array):with cond:print("Producer adding item.")shared_array[0] += 1print("Producer done, item added.")cond.notify()def consumer(cond, shared_array):with cond:print("Consumer waiting for item.")cond.wait()print("Consumer consumed item.")shared_array[0] -= 1if __name__ == "__main__":condition = Condition()shared_array = Array('i', [0])p = Process(target=producer, args=(condition, shared_array))c = Process(target=consumer, args=(condition, shared_array))c.start()time.sleep(2) # 确保消费者先运行p.start()p.join()c.join()print("Final shared value:", shared_array[0])
>>>
Consumer waiting for item.
Producer adding item.
Producer done, item added.
Consumer consumed item.
Final shared value: 0
注意事项
- 使用
Condition
对象时,必须确保在调用wait()
、notify()
和notify_all()
方法前已经获得了与条件关联的锁。 wait()
方法在返回前会重新获得锁,这意味着调用wait()
后不需要再次显式调用acquire()
。notify()
和notify_all()
不会立即释放锁,而是在当前进程的锁释放操作(release()
)执行后,被通知的进程才能继续执行。- 正确使用
Condition
可以解决复杂的同步问题,但设计不当可能导致死锁或性能问题,因此需要谨慎使用。
进程池
Pool
类
Pool
类是 Python 的 multiprocessing
模块提供的一个高级接口,用于并行执行多个进程。Pool
类可以自动管理进程池中的进程数量,让你轻松地将任务分配给进程池执行,而无需手动管理每个进程的创建和终止。这对于执行 CPU 密集型任务特别有用
构造方法
创建 Pool 对象时,可以指定几个参数,最常用的是 processes 参数,它指定了进程池中进程的数量。如果不指定,默认为机器的 CPU 核心数
from multiprocessing import Poolpool = Pool(processes=4)
实例方法
-
apply(func, args=(), kwds={})
- 使用阻塞模式执行一个任务。func 是要执行的函数,args 是函数参数的元组,kwds 是函数参数的字典。这个方法只有在前一个任务完成后才会返回,然后返回函数的结果。
-
apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
- 使用非阻塞模式执行一个任务。参数与 apply 相同,但是会立即返回一个 AsyncResult 对象。你可以使用这个对象查询任务的状态或获取结果。callback 是一个可选的回调函数,当任务完成时调用。error_callback 是任务执行过程中发生异常时调用的函数。
-
map(func, iterable, chunksize=None)
- 类似于内置的 map() 函数,它会将 iterable 中的每个元素应用于 func 函数。这个方法会阻塞直到结果返回。使用 chunksize 参数可以控制每个任务的大小,对于非常大的迭代对象,通过合适的 chunksize 可以提高性能。
-
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
- 非阻塞版本的 map()。它立即返回一个 AsyncResult 对象,可以用来查询任务状态或获取结果。callback 和 error_callback 参数的作用与 apply_async 相同。
-
close()
- 关闭进程池,使其不再接受新的任务。但是已经提交的任务会继续执行直到完成。
-
join()
- 等待所有进程池中的进程执行完毕。close() 或 terminate() 方法被调用后才能使用 join()
示例
from multiprocessing import Pool
import time
import osdef square(x):time.sleep(1)return x * xif __name__ == '__main__':print(time.time())# 创建一个包含4个进程的进程池with Pool(processes=os.cpu_count()) as p:# 使用 map 函数分配任务results = p.map(square, range(10))print(results)# 使用 apply_async 异步执行任务async_result = p.apply_async(square, (10,))print(async_result.get()) # 使用 get 方法获取异步任务的结果print(time.time())
>>>
1734330507.251192
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
100
1734330510.379658
注意事项
- 使用
Pool
时,确保只在if __name__ == '__main__'
: 块中创建Pool
对象和执行任务。这可以防止在子进程中无意中创建新的子进rocesses
,导致程序崩溃或行为异常。 Pool
的map
和apply
方法会阻塞主进程,直到所有任务完成。如果需要非阻塞操作,应使用map_async
和apply_async
方法。- 在使用完
Pool
后,应调用close()
方法关闭进程池,然后调用join()
方法等待所有进程完成。这是良好的资源管理习惯,可以防止资源泄露。 terminate()
方法会立即停止所有进程,应谨慎使用,以免丢失未完成的工作
共享状态
Value
和 Array
类
在 Python 的 multiprocessing
模块中,Value
和 Array
是两个用于进程间共享数据的类。由于进程间共享内存并非像线程间共享全局变量那样简单,这两个类提供了一种在不同进程间安全共享数据的方法。
Value
Value
类用于在进程之间共享一个存储在共享内存中的单一数据值。它适用于当你只需要共享一个简单的数据元素,如一个整数或浮点数时。
使用方法:创建 Value
对象时,需要指定数据类型和初始值。数据类型是使用类似于 C 语言类型声明的字符串来指定的,例如 ‘i’ 表示整数,‘d’ 表示双精度浮点数。
from multiprocessing import Value# 创建一个共享的整数变量,初始值为 0
num = Value('i', 0)
访问和修改值:可以通过 .value 属性来访问和修改存储在 Value 中的数据。
# 读取值
print(num.value)# 修改值
num.value = 10
Array
Array
类用于在进程之间共享一个存储在共享内存中的数组。它适用于当你需要共享一组数据(如一系列整数或浮点数)时。
使用方法:创建 Array
对象时,同样需要指定数据类型和数组的大小。数据类型的指定方式与 Value 类似,数组的大小则通过一个整数来指定。
from multiprocessing import Array# 创建一个共享的整数数组,包含 10 个元素,初始值都为 0
arr = Array('i', 10)
访问和修改数组元素:Array 支持类似于列表的索引和切片操作来访问和修改数组中的元素。
# 读取第一个元素
print(arr[0])# 修改第一个元素
arr[0] = 1# 获取数组的长度
print(len(arr))# 使用切片
arr[1:3] = [2, 3]
Manager
类
Manager 类在 Python multiprocessing 模块中提供了一种方式,允许你在不同进程间共享数据。与 Value 和 Array 直接在共享内存中存储数据不同,Manager 类创建的数据结构是通过一个服务器进程管理的。这意味着它允许更加灵活的数据共享方式,不仅限于简单的数值或数组,也支持列表、字典、Namespace 等更复杂的数据结构。
基本使用
使用 Manager 类时,首先需要创建一个 Manager 对象,然后使用这个对象来创建共享的数据结构。
from multiprocessing import Managerwith Manager() as manager:# 创建一个共享的列表shared_list = manager.list([0, 1, 2])# 创建一个共享的字典shared_dict = manager.dict({'a': 1, 'b': 2})
创建的共享数据结构可以像普通的数据结构那样被访问和修改,但是它们实际上是在一个单独的服务器进程中维护的。这意味着你可以安全地在多个进程间共享和修改这些数据结构,而无需担心进程安全问题。
Manager 支持的类型
Manager 类支持多种类型的共享数据结构,包括但不限于:
- list: 共享列表
- dict: 共享字典
- Value: 存储单个值的共享对象,类似于 multiprocessing.Value
- Array: 存储多个值的共享数组,类似于 multiprocessing.Array
- Namespace: 允许访问属性的简单方式,类似于一个简单的类或结构体
示例代码
以下示例展示了如何使用 Manager 类在进程间共享列表和字典:
from multiprocessing import Process, Managerdef worker(shared_list, shared_dict):shared_list.append('new item')shared_dict['new_key'] = 'new value'if __name__ == "__main__":with Manager() as manager:shared_list = manager.list([1, 2, 3])shared_dict = manager.dict({'key1': 'value1', 'key2': 'value2'})p = Process(target=worker, args=(shared_list, shared_dict))p.start()p.join()print(f"Shared list: {shared_list}")print(f"Shared dict: {shared_dict}")
其他组件
multiprocessing 模块提供了一些用于查询系统和进程状态的函数,这些函数对于编写并发程序时了解资源利用情况和进程管理非常有用。以下是对 cpu_count、current_process 和 active_children 三个方法的详细解释:
cpu_count()
cpu_count
函数返回机器上可用的 CPU 核心数。这个信息对于决定并行程序中进程池(Pool)的大小非常有用,因为你可能想要根据可用的处理器数量来优化你的程序性能。
使用场景:在创建进程池时,可以根据 cpu_count 的返回值来设置进程池的大小。例如,如果你的机器有 4 个 CPU 核心,那么创建一个包含 4 个进程的进程池可能是一个合理的选择。
from multiprocessing import cpu_countprint(f"Number of CPUs: {cpu_count()}")
current_process()
current_process
函数返回当前进程的信息。这个函数返回一个 Process 对象,其中包含有关当前进程的详细信息,如进程的名称、PID(进程ID)等。
使用场景:这个函数在调试并发程序时特别有用,因为你可以用它来识别当前正在执行的进程。例如,在多进程环境中打印日志时,可能会包含进程的名称或 PID 来区分日志消息是从哪个进程生成的。
from multiprocessing import current_processprocess = current_process()
print(f"Current process name: {process.name}")
print(f"Current process ID: {process.pid}")
active_children()
active_children
函数返回一个列表,包含当前活跃的子进程对象。每个子进程对象都包含有关该进程的信息,如名称、PID 等。这个函数可以在父进程中调用,以获取当前所有活跃的子进程列表。
使用场景:在管理和监控基于 multiprocessing 的并发程序时,active_children 函数可以帮助你了解有多少子进程正在运行。这对于确保所有预期内的子进程都已启动,或者在程序结束时检查是否还有未完成的子进程非常有用。
from multiprocessing import Process, active_children
import timedef worker():print("Worker sleeping...")time.sleep(2)print("Worker done.")if __name__ == "__main__":for _ in range(2):p = Process(target=worker)p.start()time.sleep(1) # 等待一段时间,让子进程启动for child in active_children():print(f"Active child: PID={child.pid}, Name={child.name}")