《Linux操作系统原理分析之Linux 进程管理 8》(12)
- 4 Linux 进程管理
- 4.8 IPC 消息队列
- 4.8.1 消息队列的结构
- 1.消息
- 2.消息队列
- 4.8.2 消息队列的生成与控制
- 1.建立及检索消息队列 建立及检索消息队列
- 2.消息队列的控制 消息队列的控制
- 4.8.3 消息的发送与接收
- 4.8.4 消息队列的程序例
4 Linux 进程管理
4.8 IPC 消息队列
linux 中进程通信两种高级通信机制 IPC 的消息队列和共享内存。IPC 消息队列,其中涉及的函数和数据结构分别定义在 ipc/msg.c 和 include/linux/msg.h。
4.8.1 消息队列的结构
IPC 消息队列一般用于客户机/服务器(C/S)模型中,客户机进程向服务器发送请求服务的消息,服务器进程接受到消息后执行客户机请求
1.消息
IPC 中一个消息由消息头和消息正文组成。消息头是一个 msg 结构体。其定义如下:
struct msg
{
struct msg *msg_next; /*指向下一个消息*/
long msg_type; /*消息类型:大于 0,是通信双方约定的消息标志*/
char *msg_spot; /*消息正文地址*/
time_t msg_time; /*消息发送时间*/
short msg_ts; /*消息正文大小,最大长度由 MSGMAX 决定,缺省为
4057 字节*/
}
2.消息队列
Linux 中可以根据进程的需要建立多个 IPC 消息队列,消息队列的最大数量由符号常量 MSGMNI 决定,缺省为 128。系统对所有消息队列统一管理。每个消息队列有唯一的标识号。
每个消息队列是由消息结构体构成的单向链表。
描述消息队列的数据结构 struct msqid_ds,称为消息队列描述符。与消息队列一一对应。其定义如下:
struct msqid_ds
{
Struct ipc_perm msg_perm; /*访问权限*/
Struct msg *msg_first; /*指向消息队列头*/
Struct msg *msg_last; /*指向消息队列尾*/
Time_t msg_stime; /*最近发送消息的时间*/
Time_t msg_rtime; /*最近接收消息的时间*/
Time_t msg_ctime; /*最近修改消息的时间*/
Struct wait_queue *wait; /*等待接收消息的进程等待队列*/
Struct wait_queue *wait; /*等待发送消息的进程等待队列*/
Unshort msg_cbytes; /*队列中当前字节数*/
Unshort msg_qnum; /*队列中消息数*/
Unshort msg_qbytes; /*队列最大字节数,不能超过 MSGMN(缺省 16384)*/
Unshort msg_lspid; /*最近发送进程的 PID*/
Unshort msg_lrpid; /*最近接收进程的 PID*/
}
4.8.2 消息队列的生成与控制
1.建立及检索消息队列 建立及检索消息队列
在程序中使用 msgget()建立一个消息队列或者检索消息队列的标识号。其定义为:
Int msgget(key_t key, int msgflg) /*msgflg 的值与 semget()中的 semflg 完全相同*/
调用成功则返回消息队列的标识号,否则返回-1;
2.消息队列的控制 消息队列的控制
对消息队列的控制操作由系统调用 msgctl()实现,其定义如下:
Int msgctl(int msqid,int cmd,struct msqid_ds *buf)
Msqid:消息队列的标识号。
Cmd:操作类型。
Buf:用于向函数传递数据和从函数的得到的结果数据。
说明:cmd 参数的种类与 semctl()类似,但制定操作的意义不同。主要有:
MSG_STAT 或或或或 IPC_STAT:把指定消息队列的描述符内容拷贝到 buf 指定的 msqid_ds 结构体中。
IPC_SET:允许修改制定消息队列描述符内容。该操作用于修改消息队列的访问权限,即 msg_perm的成员项。超级用户可以修改 msg_dbyte 的值。变更后,msg_ctime 的值自动更新。
IPC_RMID:删除消息队列及其数据结构,该操作只有超级用户或消息队列生成者进行。
MSG_INFO 或 IPC_INFO:把与消息队列有关的最大值数据输出到 msginfo 结构体中。在该结构体中记录着 IPC 消息队列的消息数、字节数和消息的字节最大数。
4.8.3 消息的发送与接收
进程使用系统调用 msgsnd()向消息队列写消息;接受进程使用系统调用 msgrcv()从消息队列读取消息。
Int msgsnd(int msqid,struct msgbuf *msgp,int msgsz,int msgopt);
Int msgrcv(int msqid,struct msgbuf *msgp,int msgsz,int msgtyp,int msgopt);
Msgsnd()调用成功返回 0,msgrcv()调用成功返回值为接收到的消息的长度;两个函数调用失败返回错误信息。
👉Msqid:消息队列标识号。
👉Msgp:指向一个 msgbuf 结构体 其定义为:
Struct msgbuf
{
Long mtype; /*消息类型*/
Char mtext[1]; /*消息正文*/
}
说明:
1.msgbuf 结构体,相当于原理部分所提的消息发送区或消息接收区。
2.Mtext,只有一个字节,并不表示消息正文本身,仅指名消息正文地址。
3.用户可以自行定义一个结构体来代替 Msgbuf结构体。(参见程序例)
👉Msgsz:消息正文长度。
👉Msgopt:操作模式。
取值为 0 时,表示忽略该参数。
在接收函数中,取值为 IPC_NOWAIT,表示即使消息队列没有需要的消息,进程也不阻塞,返回值为 ENOMEG,通知进程消息不存在。
在发送函数中,取值为 IPC_NOWAIT,表示即使消息队列已满,进程也不阻塞,返回值为 ENOMEG,通知进程重新发送消息。
👉Msgtyp:从消息队列读取消息的方式。
Msgtyp 取值为 0,读取消息队列的第一个消息。
Msgtyp 取值大于 0,表示消息类型,这时读取队列中符合该类型的第一个消息。若msgopt 指定为 MSG_EXECPT,则读取队列第一个不符合该类型的消息。
Msgtyp 取值小于 0,则表示读取小于其绝对值的类型值最小的第一个消息。 当一个消息从消息队列中读取后,由系统自动从队列中将其删除。
4.8.4 消息队列的程序例
使用消息队列在进程间通信的程序例,其功能是在进程间完成文件的传送。一个发送进程把两个文件送入消息队列,另外两个接收进程分别从消息队列各自接收一个文件。该例由 3 个源程序文件组成:user.h 是共用的头部文件,filesnd.c 的功能是把两个文件读出并发送到消息队列,filercv.c 的功能是从消息队列读取文件。发送进程执行 filesnd.c,两个接收进程执行 filercv.c。
/*user.h*/
#define F_KEY “copyfile” /*指定生成 IPC 键值的文件名*/
#define F_TYPE1 1 /*第一个文件的消息类型*/
#define F_TYPE2 2 /*第二个文件的消息类型*/
#define F_TYPE_END1 3 /*第一个文件接收完毕的消息类型*/
#define F_TYPE_END2 4 /*第二个文件接收完毕的消息类型*/
#define BUF_SIZE 256
Struct msg_data /*替代上节所讲 msgbuf 结构体,作为发送区(接收区)*/
{
Long type;
Int size;
Char buf[BUF_SIZE];
}/*filesnd.c 数据发送程序*/
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include < sys/stat.h >
#include “user.h “
Close_all(int msgid, int fd1, int fd2)
{
Close(fd1); /*关闭文件*/
Close(fd2); /*关闭文件*/
Msgctl(msgid,IPC_RMID,0);/*删除消息队列*/
}
Main(int argc,char **argv)
{
Int fd1,fd2,msgid;
Int end1=1,end2=1;
Key_t key;
Struct msg_data data1,data2;
If(argc!=3)/*检查命令行参数*/
{
Printf(”usage:argv[0] File1 File2 <CR>\n”);
Exit(-1);
}
If((fd1=open(argv[1],O_RDONLY)==-1) /*打开第一个文件,只读*/
{
Printf(“%s can’t opened\n”,argv[1]);
Exit(-1);
}
If((fd2=open(argv[2],O_RDONLY)==-1) /*打开第二个文件,只读*/
{
Printf(“%s can’t opened\n”,argv[2]);
Exit(-1);
}
If((key=ftok(F_KEY,’a’))==-1)/*生成 IPC 键值*/
{
Close(fd1);
Close(fd2);
Printf(“Sender: create Key Error.\n”);
Exit(-1);
}
If((msgid=msgget(key,IPC_CREAT|IPC_EXCL|0666))==-1) /*生成消息队列*/
{
Close(fd1);
Close(fd2);
Printf(“Sender: create Message queue Error.\n”);
Exit(-1);
}
Printf(“\nSender: create Message queue created.\n”);
Printf(“Copy %s and %s\n”,argv[1],argv[2]);
Data1.type=F_TYPE1; /*确定两个文件的消息类型*/
Data2.type=F_TYPE2;
Data1.size= Data2.size=-1;
While(data1.size||data2.size) /*读取和发送两个文件*/
{
If(data1.size)
{
Data1.size=read(fd1,data1.buf,BUF_SIZE) /*读取第一个文件*/
If(data1.size==-1)
{
Close_all(msgid,fd1,fd2);
Printf(“Sender:error read file%s \n”,argv[1]);
Exit(-1);
}
/*写入消息队列*/
If(msgsnd(msgid,(struct msgbuf*)&data1,sizeof(struct msg_data),0)==-1)
{
Printf(“Sender:error message send file%s \n”,argv[1]);
Exit(-1);
}
}
If(data2.size)
{
Data2.size=read(fd2,data2.buf,BUF_SIZE) /*读取第二个文件*/
If(data2.size==-1)
{
Close_all(msgid,fd1,fd2);
Printf(“Sender:error read file%s \n”,argv[2]);
Exit(-1);
}
/*写入消息队列*/
If(msgsnd(msgid,(struct msgbuf*)&data2,sizeof(struct msg_data),0)==-1)
{
Printf(“Sender:error message send file%s \n”,argv[2]);
Exit(-1);
}
}
}
Printf(“Sender:data transmission to message queue complete \n”);
Printf(“Sender:waiting for receiver to finish reading\n”);
/*等待接收进程结束*/
While(end1||end2)
{
Struct msgbuf rec;
If(msgrcv(msgid,&rec,sizeof(struct msgbuf),TYPE_END1,IPC_NOWAIT)!=-1)
End1=0;
If(msgrcv(msgid,&rec,sizeof(struct msgbuf),TYPE_END2,IPC_NOWAIT)!=-1)
End2=0;
}
Printf(“\nSender:disconnection of the two receiver processes \n”);
Close_all(msgid,fd1,fd2);
}/*filercv.c 数据接收程序*/
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include < sys/stat.h >
#include “user.h “
Main(int argc,char **argv)
{
Int fd,ret,msgid;
Int no;
Int type;
Key_t key;
Struct msg_data data;
Struct msgbuf dis_msg;
If(argc!=3)/*检查命令行参数*/
{
Printf(”usage:filerv 1|2 File_name <CR>\n”);
Exit(-1);
}
No=atoi(argv[1]);
If((no!=1)&&(no!=2))
{
Printf(”usage:%s 1|2 File_name <CR>\n”,argv[0]);
Exit(-1);
}
If(no==1)type = F_TYPE1;
Else type = F_TYPE2;
If((key=ftok(F_KEY,’a’))==-1) /*生成 IPC 键值*/
{
Printf(“receiver: create Key Error.\n”);
Exit(-1);
}
If((msgid=msgget(key,IPC_EXCL))=-1)/*得到键值对应的消息队列标识号*/
{
Printf(“receive:create message queue error\n”);
Exit(-1);
}
Printf(“\nreceive %d connected to the message queue \n”,no,getid());
If((fd2=open(argv[2],O_WRONLY)==-1) /*打开文件,只写*/
{
Printf(“%s can’t opened\n”,argv[2]);
Exit(-1);
}
While(1)
{ /*从消息队列接收消息*/
If(msgrcv(msgid,(struct msgbuf*)&data, sizeof(struct msg_data),type,0)!=-1)
{
Printf(“Receiver %d error during reception.\n”,no);
Exit(-1);
}
If(data.size==0) break;
Ret = write(fd,data.buf,data.size) /*把读取的消息写入文件*/
If(ret==-1)
{
Close(fd);
Msgctl(msgid,IPC_RMID,0);
Printf(“receiver %d error during writing\n“,no);
Exit(-1);
}
}
Close(fd);
Printf(“receiver %d disconnection\n“,no);
If(no==1)
Dis_msg.mtype = TYPE_END1;
else
Dis_msg.mtype = TYPE_END2;
Msgsnd(msgid,&dis_msg,sizeof(struct msgbuf),0);/*发送接收完毕消息*/
}
该程序分别由三个进程执行,使用的命令和运行结果如下:
#./filesnd dbfile1 dbfile2 &
Sender: message queue create;
Sender: data transmission to message queue completed
Sender: waiting for receiver to finish reading
#./filercv 1 file1&
Receiver 1 connected to the message queue
Receiver 1 disconnection
#./filercv 1 file2
Receiver 2 connected to the message queue
Receiver 2 disconnection
Sender: disconnection of the two receiver processes.