目录
一、简介
二、安装
三、程序实例
1. Hello
2. 简单收发
3. 点对点通信
4. 广播
5. 散发
6. 收集
7. 归约
8. 联合使用Numpy
一、简介
Mpi4py是用于Python中实现并行计算的库,基于MPI(Message Passing Interface)标准,提供了一组函数和工具,用于在分布式计算环境中进行数据的散布、收集和发送-接收操作。Mpi4py允许在Python环境下适应MPI接口进行多进程并行甚至分布式的高性能计算。MPI是一种用于编写并行程序的标准,包括协议和语义说明,旨在实现高性能、大规模性和可移植性。
Mpi4py广泛应用于科学计算、工程模拟、大数据处理等领域,特别适合需要进行大规模并行计算的任务。通过Mpi4py,可以轻松地在Python中实现复杂的并行算法,提高计算速度和效率。
MPI相关知识、接口使用和程序实现可参考:↓传送门↓
并行编程_猿核试Bug愁的博客-CSDN博客https://blog.csdn.net/l_peanut/category_12678352.html?spm=1001.2014.3001.5482
二、安装
首先安装MPICH或OpenMPI,这里以OpenMPI安装为例。
在终端中使用下列命令安装:
wget https://download.open-mpi.org/release/open-mpi/v5.0/openmpi-5.0.4.tar.gz
tar xzvf openmpi-5.0.4.tar.gz
mkdir openmpi
./configure --prefix=/home/name/install/openmpi(这里是指定安装路径,可以换成自己的安装路径)
cd openmpi-5.0.4
make all install
然后将安装路径中的bin目录和lib目录添加到环境变量中:
sudo vim .profile
文本中输入:
export PATH=/home/name/install/openmpi/bin:$PATH
export LD_LIBRARY=/home/name/install/openmpi/lib:$LD_LIBRARY
执行:
source .profile
三、程序实例
使用方法:
from mpi4py import MPI
运行方法:
mpiexec -np 2 python ./test.py #-np 2表示使用2个进程并行计算
1. Hello
从Hello world!开始:
from mpi4py import MPIprint("hello world")
运行结果:
2. 简单收发
from mpi4py import MPIcomm = MPI.COMM_WORLD
rank = comm.Get_size()
size = comm.Get_rank()if rank == 0:msg = 'I am rank 0'comm.send(msg, dest=1)
elif rank == 1:s = comm.recv()print("rank %d: %s" % (rank, s))
else:print("rank %d: My status:idle" % (rank))
运行结果:
3. 点对点通信
import mpi4py.MPI as MPIcomm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()#点对点通信开始
data_send = [comm_rank]*5#各进程将data_send中的数据如rank 0是[0,0,0,0,0]发送给1对5取余数的进程,即进程1
comm.send(data_send, dest=(comm_rank+1)%comm_size)#各进程接收上一进程发送来的数据,存放在data_recv中
data_recv = comm.recv(source=(comm_rank-1)%comm_size)print("my rank is %d, and I received:" % comm_rank)
print(data_recv)
运行结果:
4. 广播
import mpi4py.MPI as MPIcomm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()#进程0中的data是range(0, comm_size)
if comm_rank == 0:data = range(comm_size)#其它进程中接收进程0传来的data并存储在recv中
recv = comm.bcast(data if comm_rank == 0 else None, root = 0)print('rank %d, got:' % (comm_rank))
print(recv)
运行结果:
5. 散发
import mpi4py.MPI as MPIcomm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()if comm_rank == 0:data = range(comm_size)
else:data = Nonelocal_data = comm.scatter(data, root=0)print('rank %d, got:' % comm_rank)
print(local_data)
运行结果:
6. 收集
import mpi4py.MPI as MPIcomm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()if comm_rank == 0:data = range(comm_size)
else:data = None#进程0将自身的data散播给其它进程range(0,comm_size)
local_data = comm.scatter(data, root=0)
#各进程收到的数据*2
local_data = local_data * 2#各进程输出收到的local_data*2
print('rank %d, got and do:' % comm_rank)
print(local_data)#进行数据收集,进程0负责收集
combine_data = comm.gather(local_data, root=0)#进程0输出收集到的combine_data
if comm_rank == 0:print("root recv {0}".format(combine_data))
运行结果:
7. 归约
import mpi4py.MPI as MPIcomm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()if comm_rank == 0:data = range(comm_size)
else:data = None#进程0散播data, range(0, comm_size)
local_data = comm.scatter(data, root=0)
#各进程收到的local_data*2
local_data = local_data * 2print('rank %d, got and do:' % comm_rank)
print(local_data)#进程0进行归约操作,operation=MPI.SUM
all_sum = comm.reduce(local_data, root=0, op=MPI.SUM)#进程0输出归约结果
if comm_rank == 0:print('sum is:%d' % all_sum)
运行结果:
8. 联合使用Numpy
import mpi4py.MPI as MPI
import numpy as np
import os, sys, timecomm = MPI.COMM_WORLD
comm_rank = comm.Get_rank()
comm_size = comm.Get_size()if __name__ == "__main__":#创建一个矩阵if comm_rank == 0:all_data = np.arange(20).reshape(4,5)print("*************** data start ***************")print(all_data)print("*************** data end ***************")#将数组广播给各进程all_data = comm.bcast(all_data if comm_rank == 0 else None, root=0)#将分配给各进程的数据进行划分num_samples = all_data.shape[0]local_data_offset = np.linspace(0, num_samples, comm_size+1).astype('int')#各进程中存储local_data是从全部矩阵中按照偏移距去取数据local_data = all_data[local_data_offset[comm_rank]:local_data_offset[comm_rank+1]]print("*************** %d %d processor gets local data ***************" % (comm_rank,comm_size))print(local_data)#各进程进行自身数据的求和,本地求和local_sum = local_data.sum()#进程0对各进程的数据和进行求和,这里是归约操作all_sum = comm.reduce(local_sum, root=0, op=MPI.SUM)#各进程对自身归约结果进行求平方local_result = local_data ** 2#各进程将求平方后的数据全收集result = comm.allgather(local_result)result = np.vstack(result)#主进程数据归约后的各数据的平方值:if comm_rank == 0:print("********** sum: ", all_sum)print("********** result **********")print(result)
运行结果: