一、concurrent.futures库的介绍
concurrent.futures
是 Python 标准库中的一个模块,用于实现异步编程,它提供了一个高级接口来处理异步执行的可调用对象(如函数)。这个模块主要用于简化线程和进程的并发编程,使得开发者可以更方便地利用多核处理器的优势来提高程序的性能。
二、concurrent.futures库的特点
- 简单易用:
- 它提供了简洁的接口,通过
ThreadPoolExecutor
(线程池)和ProcessPoolExecutor
(进程池)这两个主要的类来管理线程和进程。例如,使用线程池执行任务时,只需要创建一个ThreadPoolExecutor
对象,然后通过submit
方法提交任务即可。
- 它提供了简洁的接口,通过
- 自动资源管理:
- 线程池和进程池会自动管理线程和进程的创建与销毁。以线程池为例,它会根据任务的数量和系统资源自动调整线程的数量,避免了手动创建和销毁线程可能导致的资源浪费和错误。当任务完成后,线程会被自动回收,而不是一直占用系统资源。
- 高并发支持:
- 可以同时处理多个任务。比如在一个有多个网络请求的场景中,使用
ThreadPoolExecutor
可以并发地发送这些请求,而不是一个一个地顺序发送。这样可以大大提高程序的执行效率,尤其是在处理 I/O 密集型任务时效果更加明显。
- 可以同时处理多个任务。比如在一个有多个网络请求的场景中,使用
- 任务状态跟踪:
- 能够方便地跟踪任务的状态。通过
Future
对象(submit
方法返回的对象),可以检查任务是否完成、获取任务的结果或者检查任务是否被取消等。例如,future.done()
方法返回True
表示任务已经完成,future.result()
方法可以获取任务执行的结果。
- 能够方便地跟踪任务的状态。通过
三、concurrent.futures库的简单使用
线程池示例(ThreadPoolExecutor)
首先导入模块:
import concurrent.futures
import time
定义一个简单的函数用于演示
def task(num):print(f"任务 {num} 开始执行")time.sleep(2) # 模拟任务执行时间print(f"任务 {num} 执行完成")return num * 2
使用线程池执行任务:
with concurrent.futures.ThreadPoolExecutor(max_workers = 3) as executor:# 提交任务,返回Future对象列表futures = [executor.submit(task, i) for i in range(5)]for future in concurrent.futures.as_completed(futures):try:result = future.result()print(f"任务结果: {result}")except Exception as e:print(f"任务出错: {e}")
- 在上述代码中,首先创建了一个线程池,
max_workers
参数指定了线程池中的最大线程数为 3。然后通过列表推导式提交了 5 个任务,每个任务是调用task
函数并传入一个不同的参数。as_completed
函数用于迭代已经完成的任务,通过future.result()
获取任务的结果。
进程池示例(ProcessPoolExecutor)
- 导入模块和定义函数部分与线程池示例相同。
- 使用进程池执行任务:
with concurrent.futures.ProcessPoolExecutor(max_workers = 3) as executor:futures = [executor.submit(task, i) for i in range(5)]for future in concurrent.futures.as_completed(futures):try:result = future.result()print(f"任务结果: {result}")except Exception as e:print(f"任务出错: {e}")
- 进程池的使用方式和线程池类似,主要的区别在于
ProcessPoolExecutor
会使用进程而不是线程来执行任务。在处理 CPU 密集型任务时,进程池通常能更好地利用多核处理器的优势,因为每个进程有自己独立的内存空间和 CPU 核心,避免了 Python 中全局解释器锁(GIL)对多线程执行 CPU 密集型任务的限制。不过,进程池的开销比线程池大,因为进程的创建和销毁需要更多的系统资源。
四、concurrent.futures库的完整示例
1. 线程池高级用法示例
示例一:设置回调函数处理任务结果
import concurrent.futures
import timedef task(num):print(f"任务 {num} 开始执行")time.sleep(2) # 模拟任务执行时间print(f"任务 {num} 执行完成")return num * 2def callback(future):try:result = future.result()print(f"回调函数处理结果: {result}")except Exception as e:print(f"处理结果出错: {e}")with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:futures = [executor.submit(task, i) for i in range(5)]for future in futures:future.add_done_callback(callback)
在这个示例中,定义了callback
函数作为回调函数,当每个任务对应的Future
对象完成时(也就是任务执行完成后),会自动调用callback
函数来处理结果。这样可以方便地在任务完成的第一时间进行额外的操作,比如记录日志、更新状态等。
示例二:限制任务提交速率(使用Semaphore
)
import concurrent.futures
import time
import threading# 信号量,限制同时执行的任务数量
semaphore = threading.Semaphore(2)def task(num):with semaphore:print(f"任务 {num} 开始执行")time.sleep(2) # 模拟任务执行时间print(f"任务 {num} 执行完成")return num * 2with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:futures = [executor.submit(task, i) for i in range(10)]for future in concurrent.futures.as_completed(futures):try:result = future.result()print(f"任务结果: {result}")except Exception as e:print(f"任务出错: {e}")
这里利用了 Python 的threading.Semaphore
(信号量),在定义的task
函数中通过with semaphore
语句来限制同一时刻最多只有 2 个任务在执行,即便线程池的最大线程数设置为 5,也能起到控制任务提交速率的作用,避免一下子提交过多任务导致资源耗尽等问题,常用于需要控制并发量的场景,比如对接口调用频率有限制的情况。
2. 进程池高级用法示例
示例一:共享数据(使用Manager
)
from concurrent.futures import ProcessPoolExecutor
import multiprocessingdef worker(num, shared_list):shared_list.append(num * 2)print(f"进程 {multiprocessing.current_process().pid} 处理任务 {num}")if __name__ == "__main__":manager = multiprocessing.Manager()shared_list = manager.list()with ProcessPoolExecutor(max_workers=3) as executor:tasks = [executor.submit(worker, i, shared_list) for i in range(5)]for task in tasks:task.result()print(f"共享列表最终结果: {shared_list}")
在多进程编程中,各个进程有独立的内存空间,直接共享数据会有问题。通过multiprocessing.Manager
创建的共享对象(这里是manager.list()
创建的列表),可以实现在不同进程间共享数据。在worker
函数中,各个进程可以对这个共享列表进行操作,最终在主进程中可以看到汇总后的结果。
示例二:进程间通信结合队列(使用Queue
)
from concurrent.futures import ProcessPoolExecutor
import multiprocessingdef producer(num, queue):queue.put(num * 2)print(f"进程 {multiprocessing.current_process().pid} 生产数据 {num * 2}")def consumer(queue):while True:if not queue.empty():data = queue.get()print(f"进程 {multiprocessing.current_process().pid} 消费数据 {data}")breakif __name__ == "__main__":queue = multiprocessing.Queue()with ProcessPoolExecutor(max_workers=3) as executor:producer_futures = [executor.submit(producer, i, queue) for i in range(3)]for future in producer_futures:future.result()consumer_future = executor.submit(consumer, queue)consumer_future.result()
这里利用multiprocessing.Queue
实现了进程间的通信,producer
函数作为生产者向队列中放入数据,consumer
函数作为消费者从队列中获取数据并处理。通过ProcessPoolExecutor
在不同进程中执行这些函数,实现了简单的多进程间数据的传递和处理,常用于多进程协同完成复杂任务,例如一个进程负责数据采集,另一个进程负责数据分析等场景。