目录
一.进程间通信目的
二.管道
三.匿名管道
3.1用fork来共享管理管道
3.2站在文件描述符角度-深度理解管道
3.3内核角度
3.4管道样例
3.4.1测试管道读写
3.4.2代码
解决方案1:倒着关闭:
解决方案2: 只让父进程一个人指向写端
四.命名管道
4.1创建命名管道
4.2命名管道与匿名管道的区别
4.2命名管道的打开规则
4.3用命名管道实现server&client通信
五.system V 共享内存
5.1共享内存示意图
5.2共享内存函数
5.3代码
一.进程间通信目的
• 数据传输:一个进程需要将它的数据发送给另一个进程
• 资源共享:多个进程之间共享同样的资源。
• 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进 程终止时要通知父进程)。
• 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够 拦截另⼀个进程的所有陷入和异常,并能够及时知道它的状态改变。
二.管道
• 管道是Unix中最古老的进程间通信的形式。
• 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
三.匿名管道
#include <unistd.h>
功能:创建⼀⽆名管道
原型int pipe(int fd[2]);
参数
fd:⽂件描述符数组,其中fd[0]表⽰读端, fd[1]表⽰写端
返回值:成功返回0,失败返回错误代码
3.1用fork来共享管理管道
3.2站在文件描述符角度-深度理解管道
3.3内核角度
3.4管道样例
3.4.1测试管道读写
第一种情况:
写慢读快:
#include<iostream>
#include<cstdio>
#include<unistd.h>
#include<cstring>
#include <sys/types.h>
#include <sys/wait.h>void ChildWrite(int wfd)
{char buffer[1024];int cnt=0;while(true){snprintf(buffer,sizeof(buffer),"I am child,pid:%d,cnt:%d",getpid(),cnt++);write(wfd,buffer,strlen(buffer));sleep(1);}
}void FatherRead(int rfd)
{char buffer[1024];while(true){//buffer[0]=0;ssize_t n=read(rfd,buffer,sizeof(buffer)-1);if(n>0){buffer[n]=0;std::cout<<"child say:"<<buffer<<std::endl;}}
}int main()
{int fds[2]={0};int n=pipe(fds);if(n<0){std::cerr<<"pipe error"<<std::endl;return 1;}pid_t id=fork();if(id == 0){//childclose(fds[0]);ChildWrite(fds[1]);close(fds[1]);exit(0);}//fatherclose(fds[1]);FatherRead(fds[0]);waitpid(id,nullptr,0);close(fds[0]);return 0;
}
根据代码可以看出,写端每隔一秒就写一段话,读端我们没有继续睡眠等待,这里读的速度是很快的,所以在后面read的时候就会阻塞等待,读也就是会一直卡在read里,等待子进程继续写。
第二种情况:
写快读慢:
两者切换一下顺序,写端不再睡眠,读端睡眠一秒:
这里一下就把管道文件写满了,写就会阻塞等待,等待读端来读。
第三种情况:
写关,继续读:
如果父进程不加break跳出循环的就会一直读(不会阻塞):
read会读到返回值为0,表示文件结尾。
第四种情况:
读关闭,写继续:
也就是说子进程只写,但是没有人读,这种情况没有任何意义。因为通信就是要进行交互的。OS不会做没有意义的事情,所以会杀死进程,然后发送异常信号。
我们把读端关掉之后,OS会直接杀死子进程,因为子进程的写端没有意义了,信号是13:
3.4.2代码
Main.cc
#include"ProcessPool.hpp"
int main()
{//构建通信管道ProcessPool pp(gdefaultnum);pp.Creat();// pp.Debug();int task_node=0;int cnt=10;while (cnt--){//1.选择一个信道//pp.PushTask(task_node++);pp.Run();sleep(1);}pp.Stop();//sleep(100);return 0;
}
ProcessPool.hpp
#ifndef __PROCESS_POOL_HPP_
#define __PROCESS_POOL_HPP_#include <iostream>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include <sys/wait.h>
#include "Task.hpp"
class Channel
{
public:Channel(int fd, pid_t id) : _wfd(fd), _subid(id){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}void Send(int code){int n = write(_wfd, &code, sizeof(code));(void)n;}void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}int GetWfd() { return _wfd; }pid_t GetSubid() { return _subid; }std::string GetName() { return _name; }~Channel() {}private:int _wfd;pid_t _subid;std::string _name;
};class ChannelManager
{
public:ChannelManager() : _next(0){}void InsertChannel(int wfd, pid_t subid) // 插入新创建的管道,管理起来{Channel c(wfd, subid);_channels.push_back(c);}Channel &Select() // 选择一个管道工作{auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}void PrintChannel(){for (auto &channel : _channels){std::cout << channel.GetName() << std::endl;}}void StopProcess(){for (auto &channel : _channels){channel.Close();std::cout << "关闭:" << channel.GetName() << std::endl;}}void WaitSubProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收:" << channel.GetName() << std::endl;}}~ChannelManager() {}private:std::vector<Channel> _channels;int _next;
};const int gdefaultnum = 5; // 多少个管道
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(DownLoad);_tm.Register(UpLoad);}void Work(int rfd){while (true){// std::cout<<"我是子进程,我的rfd是:"<<rfd<<std::endl;// sleep(5);int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){if (n != sizeof(code)) // 读的不规范,上去重新读{continue;}std::cout << "子进程[" << getpid() << "]收到任务码:" << code << std::endl;// 执行任务_tm.Execute(code);}else if (n == 0) // 写端关闭{std::cout << "子进程退出" << std::endl;break;}else{std::cout << "读取错误" << std::endl;break;}}}bool Creat(){for (int i = 0; i < gdefaultnum; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false; // 管道建立失败// 2.创建子进程pid_t subid = fork(); // 子读父写if (subid == 0){// 子进程// 3.关闭不需要的文件描述符close(pipefd[1]);Work(pipefd[0]); // 这是子进程需要做的工作close(pipefd[0]);exit(0);}else if (subid > 0){// 父进程// 3.关闭不需要的文件描述符close(pipefd[0]);_cm.InsertChannel(pipefd[1], subid); // 构建一个信道}else{// 失败return false;}}return true;}void Debug(){_cm.PrintChannel();}void Run(){// 0.选择一个任务int taskcode = _tm.Code();// 1.选择一个信道(子进程),必须负载均衡auto &c = _cm.Select();std::cout << "选择了一个子进程:" << c.GetName() << std::endl;// 2.发送任务c.Send(taskcode);std::cout << "发送一个任务码" << taskcode << std::endl;}void Stop(){// 关闭父进程所有的wfd,不再写入,子进程读的时候会读到文件末尾,我们的else if(n==0) 后面有break_cm.StopProcess();// 回收所有子进程_cm.WaitSubProcess();}~ProcessPool() {}private:ChannelManager _cm;int _process_num;TaskManager _tm;
};#endif
Task.hpp
#pragma once
#include<iostream>
#include<vector>
#include<ctime>
typedef void(*task_t)();void PrintLog()
{std::cout<<"我是一个打印日志的任务"<<std::endl;
}
void DownLoad()
{std::cout<<"我是一个下载的任务"<<std::endl;
}
void UpLoad()
{std::cout<<"我是一个上传的任务"<<std::endl;
}class TaskManager
{
public:TaskManager(){srand(time(nullptr));}void Register(task_t t){_tasks.push_back(t);}int Code(){return rand()%_tasks.size();}void Execute(int code){if(code>=0&&code<_tasks.size()){_tasks[code]();}}~TaskManager(){}
private:std::vector<task_t> _tasks;
};
上面的代码有一个问题
如果把关闭和回收放到一起,上面的是分开放的,这是没问题的,那么下面的这种写法会有什么问题?
等待十秒之后,我们预料的关闭和回收并没有被打印出来:
我们知道每一个进程都有一个文件描述符表,其中0,1,2是被默认打开的标准输入和标准输出,当我们在创建管道文件的时候,在文件描述符表中就会写入假设是3和4。父进程在进行创建子进程的时候,子进程会继承父进程的文件描述符表。
上面的代码。在创建子进程的时候,子进程会继承父进程的文件描述符表,所以子进程的文件描述符表需要关闭写端,父进程需要关闭读端。这是第一次进入循环。
第二次循环,再次创建管道文件,因为父进程的3已经被关闭了,所以重新分配的话会分配3和5,后面依然会关闭3,此时就要创建新的子进程了,那么这个子进程有了上面的经验,这个子进程就会进程父进程所有的东西,包括上面父进程的4。此时指向第一个struct file文件的指针会多一个,它的引用计数就会变成2.
以此类推的话,创建5个管道的话就会有5个指针指向第一个管道文件,在我们关闭第一个文件的时候它的引用计数会减为4,所以父进程在close的时候只是引用计数减一,并没有确切的关闭这个文件,所以在进入后面的代码的时候,子进程会阻塞等待wait。就会卡住在第一个循环里。
怎么解决?
解决方案1:倒着关闭:
void CloseAndWait(){// for (auto &channel : _channels)// {// channel.Close();// std::cout << "关闭:" << channel.GetName() << std::endl;// channel.Wait();// std::cout << "回收:" << channel.GetName() << std::endl;// }for(int i=_channels.size() - 1;i>=0;i--){_channels[i].Close();std::cout << "关闭:" << _channels[i].GetName() << std::endl;_channels[i].Wait();std::cout << "回收:" << _channels[i].GetName() << std::endl;}}
解决方案2: 只让父进程一个人指向写端
上面的方法并不好,虽然解决了问题,这里我们希望只让父进程的写端指向我们的文件,所以我们在每一次的创建新的子进程的时候,让子进程把它的兄弟进程的写端给关掉,根本不增加引用计数的机会。
我们只需要在manager里添加一个方法:
void CloseAll(){for(auto &channel:_channels){channel.Close();}}
之后运行这个:
就没有问题了:
在子进程里加上这个方法就行了:
比较细节的一点就是,注意我们写CloseAll函数的时候,是把里面的所有的写端都给关闭了。因为有写时拷贝的缘故,这里并不会把父进程的写端给关闭掉。比如第一次循环,_cm进程池里就什么都没有,因为创建子进程时就立刻继承了父进程。
四.命名管道
• 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
• 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名 管道。
• 命名管道是一种特殊类型的文件
4.1创建命名管道
命令行创建管道:
命名管道也可以从程序里创建,相关函数有:
int mkfifo(const char *filename,mode_t mode);
4.2命名管道与匿名管道的区别
• 匿名管道由pipe函数创建并打开。
• 命名管道由mkfifo函数创建,打开用open
• FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些 工作完成之后,它们具有相同的语义。
4.2命名管道的打开规则
• 如果当前打开操作是为读而打开FIFO时
◦ O_NONBLOCK disable:阻塞直到有相应进程为写而打开该FIFO
◦ O_NONBLOCK enable:立刻返回成功
• 如果当前打开操作是为写而打开FIFO时
◦ O_NONBLOCK disable:阻塞直到有相应进程为读而打开该FIFO
◦ O_NONBLOCK enable:立刻返回失败,错误码为ENXIO
4.3用命名管道实现server&client通信
server.cc
#include "comm.hpp"
// int main()
// {
// umask(0);
// //新建管道
// int n=mkfifo(FIFO_FILE,0666);//运行时先运行这一个,因为要创建管道
// if(n!=0)
// {
// std::cerr<< "mkfifo error"<<std::endl;
// return 1;
// }
// std::cout<<"make fifo success"<<std::endl;// //打开
// //write方没有执行open的时候,read方就要在open内部阻塞
// //直到有人把管道文件打开了,open才会返回
// //这也就是为什么虽然我们创建管道文件成功了,但是后面没有打印open fifo success的原因
// int fd=open(FIFO_FILE,O_RDONLY);
// if(fd<0)
// {
// std::cerr<< "open fifo error"<<std::endl;
// return 2;
// }
// std::cout<<"open fifo success"<<std::endl;// //读
// char buffer[1024];
// while (true)
// {
// int num=read(fd,buffer,sizeof(buffer)-1);
// if(num>0)
// {
// buffer[num]=0;
// std::cout<<"client say:"<<buffer<<std::endl;
// }
// else if(num==0)
// {
// std::cout<<"if client quit,server quit too"<<std::endl;
// break;
// }
// else
// {
// std::cout<<"read error"<<std::endl;
// break;
// }
// }
// close(fd);// //删除命名管道
// n = unlink(FIFO_FILE);
// if(n == 0)
// {
// std::cout<<"remove fifo success"<<std::endl;
// }
// else
// {
// std::cout<<"remove fifo failed"<<std::endl;
// }// return 0;
// }int main()
{// 创建一个类来实现NamedFifo fifo(PATH, FILENAME);//下面进行文件操作FileOper readfile(PATH,FILENAME);readfile.OpenForRead();readfile.Read();readfile.Close();return 0;
}
client.cc
// #include<iostream>
// #include<sys/types.h>
// #include<sys/stat.h>
// #include<string>
// #include<fcntl.h>
// #include<unistd.h>
#include"comm.hpp"
// int main()
// {
// int fd=open(FIFO_FILE,O_WRONLY);
// if(fd<0)
// {
// std::cerr<< "open fifo error"<<std::endl;
// return 2;
// }
// pid_t id=getpid();
// int cnt=1;
// while (true)
// {
// std::cout<<"please Enter#";
// std::string message;
// //std::cin>>message;
// std::getline(std::cin,message);
// message+=(",message number:"+std::to_string(cnt++)+"["+std::to_string(id)+"]");// int n=write(fd,message.c_str(),message.size());
// if(n>0)
// {
// }
// }// close(fd);
// return 0;
// }int main()
{FileOper writefile(PATH,FILENAME);writefile.OpenForWrite();writefile.Write();writefile.Close();return 0;
}
comm.hpp
#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <string>
#include <fcntl.h>
#include <unistd.h>
#define FIFO_FILE "fifo"#define FILENAME "fifo"
#define PATH "."class NamedFifo
{
public:NamedFifo(const std::string &path, const std::string &name): _path(path), _name(name){_fifoname = _path + "/" + _name;umask(0);// int n = mkfifo(FIFO_FILE, 0666);int n = mkfifo(_fifoname.c_str(), 0666);if (n < 0){std::cerr << "mkfifo error" << std::endl;}else{std::cerr << "mkfifo success" << std::endl;}}~NamedFifo(){int n = unlink(_fifoname.c_str());if (n == 0){std::cout << "remove fifo success" << std::endl;}else{std::cout << "remove fifo failed" << std::endl;}}private:std::string _path;std::string _name;std::string _fifoname;
};class FileOper
{
public:FileOper(const std::string &path, const std::string &name): _path(path), _name(name), _fd(-1){_fifoname = _path + "/" + _name;}void OpenForRead(){_fd = open(FIFO_FILE, O_RDONLY);if (_fd < 0){std::cerr << "open fifo error" << std::endl;return;}std::cout << "open fifo success" << std::endl;}void OpenForWrite(){_fd = open(FIFO_FILE, O_WRONLY);if (_fd < 0){std::cerr << "open fifo error" << std::endl;return;}std::cout << "open fifo success" << std::endl;}void Write(){pid_t id = getpid();int cnt = 1;while (true){std::cout << "please Enter#";std::string message;// std::cin>>message;std::getline(std::cin, message);message += (",message number:" + std::to_string(cnt++) + "[" + std::to_string(id) + "]");int n = write(_fd, message.c_str(), message.size());if (n > 0){}}}void Read(){char buffer[1024];while (true){int num = read(_fd, buffer, sizeof(buffer) - 1);if (num > 0){buffer[num] = 0;std::cout << "client say:" << buffer << std::endl;}else if (num == 0){std::cout << "if client quit,server quit too" << std::endl;break;}else{std::cout << "read error" << std::endl;break;}}}void Close(){if (_fd > 0)close(_fd);}~FileOper(){}private:std::string _path;std::string _name;std::string _fifoname;int _fd;
};
五.system V 共享内存
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据
5.1共享内存示意图
5.2共享内存函数
shmget:
参数
key:这个共享内存段名字
size:共享内存大小
shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的取值为
IPC_CREAT:共享内存不存在,创建并返回;共享内存已存在,获取并返回。
取值为IPC_CREAT | IPC_EXCL:共享内存不存在,创建并返回;共享内存已存在,出错返回。
返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回-1
先来认识一下这个函数:
server.cc:
#include"comm.hpp"int main()
{Shm shm;shm.Create();return 0;
}
client.cc:
#include"comm.hpp"
int main()
{return 0;
}
comm.hpp:
#include <iostream>
#include <cstdio>
#include <sys/ipc.h>
#include <sys/shm.h>const int gdefaultid = -1;
const int gsize = 4096;
const std::string pathname = ".";
const int projid = 0x66;#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)class Shm
{
public:Shm() : _shmid(gdefaultid), _size(gsize){}// 创建的内存要是全新的内存,注意函数的参数void Create(){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("key: 0x%x\n", k);_shmid=shmget(k,_size,IPC_CREAT | IPC_EXCL);if(_shmid < 0){ERR_EXIT("shmget");}printf("shmid: %d\n", _shmid);}~Shm(){}private:int _shmid;int _size;
};
编译后运行:
因为之前已经运行过了,根据shmget函数的个性这里会报错。
ipcs -m可以查看我们已经创建的共享内存:
这里删除的时候要用shmid(删除和控制共享内存,在用户层我们不能用key,key未来只给内核来区分唯一性,我们用户需要用shmid来管理共享内存):
注意共享内存的生命周期,随内核。
到这里上面是用函数来创建共享内存和用指令查看共享内存的操作。
shmctl函数:
功能:⽤于控制共享内存
原型int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数shmid:由shmget返回的共享内存标识码cmd:将要采取的动作(有三个可取值)buf:指向⼀个保存着共享内存的模式状态和访问权限的数据结构
返回值:成功返回0;失败返回-1
用指令来删除共享内存是一种方式,这种方式太麻烦了,这里我们就用代码来删:
void Destroy(){if(_shmid==gdefaultid) return;int n=shmctl(_shmid,IPC_RMID,NULL);if(n>0){printf("shmctl delete shm: %d success\n",_shmid);}else{ERR_EXIT("shmctl\n");}}
shmctl函数来进行删除。
shmat:
功能:将共享内存段连接到进程地址空间
原型void *shmat(int shmid, const void *shmaddr, int shmflg);
参数shmid: 共享内存标识shmaddr:指定连接的地址shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY
返回值:成功返回⼀个指针,指向共享内存第⼀个节;失败返回-1
说明:
shmaddr为NULL,核⼼⾃动选择⼀个地址
shmaddr不为NULL且shmflg⽆SHM_RND标记,则以shmaddr为连接地址。
shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会⾃动向下调整为SHMLBA的整数倍。
公式:shmaddr - (shmaddr % SHMLBA)
shmflg=SHM_RDONLY,表⽰连接操作⽤来只读共享内存
5.3代码
client.cc:
#include"comm.hpp"
int main()
{// Shm shm;// shm.Get();// sleep(5);// //映射到自己的地址空间// shm.Attach();// sleep(5);// shm.Destroy();Shm shm(pathname,projid,USER);char* mem=(char*)shm.VirtualAddr();while(true){printf("%s\n",mem);sleep(1);}return 0;
}
server.cc:
#include"comm.hpp"int main()
{// Shm shm;// shm.Create();// sleep(5);// shm.Attach();// shm.VirtualAddr();// sleep(5);// shm.Destroy();Shm shm(pathname,projid,CREATER);char* mem=(char*)shm.VirtualAddr();for(char c='A';c<='Z';c++){mem[c-'A']=c;sleep(1);}return 0;
}
comm.hpp
#include <iostream>
#include <cstdio>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#include <string>const int gdefaultid = -1;
const int gsize = 4096;
const std::string pathname = ".";
const int projid = 0x66;
const int mode = 0666;
#define CREATER "creater"
#define USER "user"#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)class Shm
{// 这里单独封装创建出来的共享内存void CreateHalper(int flg){// key_t k=ftok(pathname.c_str(),projid);// if(k<0)// {// ERR_EXIT("ftok");// }printf("key: 0x%x\n", _key);_shmid = shmget(_key, _size, flg);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid: %d\n", _shmid);}// 创建的内存要是全新的内存,注意函数的参数void Create(){CreateHalper(IPC_CREAT | IPC_EXCL | mode);}// 获取共享内存void Get(){CreateHalper(IPC_CREAT); // 这里单独的封装}void Attach(){_start_mem = shmat(_shmid, nullptr, 0);if ((long long)_start_mem < 0){ERR_EXIT("shmat");}printf("attch success\n");}void Destroy(){if (_shmid == gdefaultid)return;int n = shmctl(_shmid, IPC_RMID, NULL);if (n > 0){printf("shmctl delete shm: %d success\n", _shmid);}else{ERR_EXIT("shmctl\n");}}
public:Shm(const std::string &pathname, int projid, const std::string usertype): _shmid(gdefaultid), _size(gsize), _start_mem(nullptr), _usertype(usertype){_key = ftok(pathname.c_str(), projid);if (_key < 0){ERR_EXIT("ftok");}if (usertype == CREATER)Create();else if (usertype == USER)Get();Attach();}int GetSize(){return _size;}// void Destroy()// {// if (_shmid == gdefaultid)// return;// int n = shmctl(_shmid, IPC_RMID, NULL);// if (n > 0)// {// printf("shmctl delete shm: %d success\n", _shmid);// }// else// {// ERR_EXIT("shmctl\n");// }// }// void Attach()// {// _start_mem=shmat(_shmid,nullptr,0);// if((long long)_start_mem<0)// {// ERR_EXIT("shmat");// }// printf("attch success\n");// }void *VirtualAddr(){printf("VirtualAddr:%p\n", _start_mem);return _start_mem;}~Shm(){if(_usertype==CREATER)Destroy();}private:int _shmid;int _key;int _size;void *_start_mem;std::string _usertype;
};
现象:
注意