在现代软件开发中,仅仅依靠单线程程序已经难以满足日益复杂的需求。尤其是在需要高并发、高可用性或者需要在隔离不同功能模块的场景下,多进程编程(也就是我们今天要深入探讨的IPC — intert-Process-Communication,进程间通信)成为了必不可少的技能。
本文将带领你深入理解C++中的IPC机制,从理论到实践,从简单到复杂,通过详细的案例代码,让你彻底掌握多进程协作的艺术。
1.为什么需要IPC?理解多进程的诱惑
我们先来思考一下,为什么不直接使用多线程,而要引入更复杂的IPC呢?
- 资源隔离与稳定性:每个进程都有独立的地址空间。一个进程崩溃通常不会影响到其他进程,这大大提高了系统 的健壮性。例如,浏览器中每个标签页通常是一个独立的进程。
- 安全性:不同进程之间的数据默认是隔离的,这使得数据访问更加可控,降低了恶意攻击的风险。
- 利用多核CPU: 真正的并行执行,不同进程可以在不同的CPU核心上同时运行不同的指令序列,充分利用现代多核处理器的性能。
- 模块化与解耦:可以将大型系统拆分为多个独立的、职责单一的进程,降低系统复杂度,便于维护和升级。
- 继承性操作:在Linux/Unix系统中,fork()创建的子进程会继承父进程的大部分资源,这为某些特定的编程模式提供了便利。
当然,与多线程相比,多进程的开销(创建,销毁,上下文切换(看注解))通常更大,进程间通信也更复杂。因此,选择多线程还是多进程,取决于你的具体应用场景和权衡。
2.IPC的基石:共享数据与消息传递
无论采用何种IPC方式,其核心思想无非两种:
1.共享数据:多个进程访问同一块物理内存或文件
2.消息传递:进程之间通过发送和接收消息来交换信息
接下来,我们将围绕这两种核心思想,介绍C++中常用的IPC机制。
3.C++IPC实践:深入各种机制
C++本身不像Java有高级的并发库,其IPC能力通常依赖于底层操作系统提供的API。我们将主要聚焦于Linux/Unix环境下的IPC机制,因为它们是C++IPC中最常用、最灵活的。
3.1 管道:最简单的双向通信
管道是最早、最简单的IPC形式。他分为两种:
- 匿名管道:只能用于具有亲缘关系的进程(父子、兄弟),因为它依赖于文件描述符的继承。
- 命名管道:可以用于任意两个进程,即使它们之间没有亲缘关系。它在文件系统中有一个对应的路径。
优点:简单易用
缺点:效率不高(数据需要拷贝),半双工(通常),匿名管道限制多。
案例:匿名管道
匿名管道通过pipe()函数创建一对文件描述符:一个用于读取,一个用于写入。
场景:父进程发送数据给子进程,子进程读取并打印。
#include <iostream>
#include <unistd.h> // For pipe(), fork(), read(), write(), close()
#include <sys/wait.h> // For wait()
#include <string>
#include <cstring> // For strlen()
int main() {
int pipe_fd[2]; // pipe_fd[0] for read, pipe_fd[1] for write
pid_t pid;
const int BUFFER_SIZE = 256;
char buffer[BUFFER_SIZE];
const char* message = "Hello from parent process!";
// 1. 创建管道
if (pipe(pipe_fd) == -1) {
perror("pipe");
return 1;
}
// 2. 创建子进程
pid = fork();
if (pid == -1) {
perror("fork");
return 1;
}
if (pid == 0) { // 子进程
// 关闭写入端,因为子进程只从管道读取
close(pipe_fd[1]);
std::cout << "Child process: Waiting for data..." << std::endl;
// 3. 子进程从管道读取数据
ssize_t bytes_read = read(pipe_fd[0], buffer, BUFFER_SIZE - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0'; // Null-terminate the string
std::cout << "Child process received: '" << buffer << "'" << std::endl;
} else if (bytes_read == 0) {
std::cout << "Child process: Parent closed pipe." << std::endl;
} else {
perror("read");
}
// 关闭读取端
close(pipe_fd[0]);
exit(0); // 子进程退出
} else { // 父进程
// 关闭读取端,因为父进程只向管道写入
close(pipe_fd[0]);
std::cout << "Parent process: Sending data..." << std::endl;
// 3. 父进程向管道写入数据
if (write(pipe_fd[1], message, strlen(message)) == -1) {
perror("write");
}
std::cout << "Parent process: Data sent." << std::endl;
// 关闭写入端,发送EOF信号给子进程
close(pipe_fd[1]);
// 等待子进程结束
wait(NULL);
std::cout << "Parent process: Child process finished." << std::endl;
}
return 0;
}
运行结果:
Parent process: Sending data...
Parent process: Data sent.
Child process: Waiting for data...
Child process received: 'Hello from parent process!'
Parent process: Child process finished.
代码解析:
pipe(pipe_fd):创建管道。pipe_fd[0]是读端,pipe_fd[1]是写端。
fork():创建子进程。
关键点:父子进程都需要关闭它们不使用的那一端管道。
子进程关闭pipe_fd[1](写端),因为它只读。
父进程关闭pipe_fd[0](读端),因为它只写。
read()/write():像操作文件一样读写管道。
当管道的写入端全部关闭时,read()会返回0,表示文件结束符(EOF)。
案例:命名管道
命名管道(FIFO)通过文件系统路径来识别。
场景:一个进程作为写入者,另一个进程作为读取者。可以先启动读取者,再启动写入者。
fifo_writer.cpp
#include <iostream>
#include <fcntl.h> // For O_WRONLY
#include <sys/stat.h> // For S_IRUSR, S_IWUSR
#include <unistd.h> // For write()
#include <cstring> // For strlen()
const char* FIFO_PATH = "/tmp/my_fifo";
int main() {
// 1. 创建命名管道
// 如果fifo不存在,则创建。0666表示读写权限
if (mkfifo(FIFO_PATH, 0666) == -1) {
if (errno != EEXIST) { // 如果错误不是因为文件已存在
perror("mkfifo");
return 1;
}
std::cout << "FIFO already exists: " << FIFO_PATH << std::endl;
} else {
std::cout << "FIFO created successfully: " << FIFO_PATH << std::endl;
}
// 2. 打开命名管道用于写入
int fd = open(FIFO_PATH, O_WRONLY);
if (fd == -1) {
perror("open for write");
return 1;
}
std::cout << "Opened FIFO for writing." << std::endl;
const char* message = "Hello from FIFO writer!";
// 3. 写入数据
if (write(fd, message, strlen(message) + 1) == -1) { // +1 for null terminator
perror("write");
} else {
std::cout << "Message sent: '" << message << "'" << std::endl;
}
// 4. 关闭管道
close(fd);
std::cout << "Closed FIFO." << std::endl;
// 清理:删除FIFO文件(可选,但推荐在不再使用时清理)
// unlink(FIFO_PATH); // 通常在reader或一个专门的清理脚本中执行
return 0;
}
fifo_reader.cpp
#include <iostream>
#include <fcntl.h> // For O_RDONLY
#include <sys/stat.h> // For S_IRUSR, S_IWUSR
#include <unistd.h> // For read()
#include <cstdio> // For remove()
const char* FIFO_PATH = "/tmp/my_fifo";
const int BUFFER_SIZE = 256;
char buffer[BUFFER_SIZE];
int main() {
// 1. 打开命名管道用于读取
// 注意:如果mkfifo被writer创建,reader可以直接通过open打开。
// 如果reader先启动,且fifo不存在,则会失败,需要reader也先尝试mkfifo。
// 这里我们假设writer会先确保fifo创建。
int fd = open(FIFO_PATH, O_RDONLY);
if (fd == -1) {
perror("open for read");
return 1;
}
std::cout << "Opened FIFO for reading. Waiting for data..." << std::endl;
// 2. 从管道读取数据
ssize_t bytes_read = read(fd, buffer, BUFFER_SIZE - 1);
if (bytes_read > 0) {
buffer[bytes_read] = '\0'; // Null-terminate
std::cout << "Message received: '" << buffer << "'" << std::endl;
} else if (bytes_read == 0) {
std::cout << "End of file (writer closed FIFO)." << std::endl;
} else {
perror("read");
}
// 3. 关闭管道
close(fd);
std::cout << "Closed FIFO." << std::endl;
// 4. 清理:删除FIFO文件(通常由其中一个进程负责,或者一个启动/关闭脚本)
if (remove(FIFO_PATH) == -1) {
perror("remove fifo");
} else {
std::cout << "FIFO removed: " << FIFO_PATH << std::endl;
}
return 0;
}
编译和运行:
g++ fifo_writer.cpp -o writer
g++ fifo_reader.cpp -o reader
# 打开两个终端
# 终端1: 启动reader,它会阻塞直到writer写入
./reader
# 终端2: 启动writer
./writer
运行结果:
终端一(reader):
Opened FIFO for reading. Waiting for data...
Message received: 'Hello from FIFO writer!'
Closed FIFO.
FIFO removed: /tmp/my_fifo
终端2(writer):
FIFO created successfully: /tmp/my_fifo # 或者 FIFO already exists...
Opened FIFO for writing.
Message sent: 'Hello from FIFO writer!'
Closed FIFO.
代码解析:
mkfifo(FIFO_PATH,0666):在FIFO_PATH创建命名管道文件。注意0666是文件权限。如果管道文件不存在,mkfifo会创建它;如果已存在,则什么也不做(如果errno是EEXIST)。
open(FIFO_PATH,0_WRONLY)/open(FIFO_PATH,0_RDONLY):通过路径打开命名管道,像常规文件一样进行读写。
remove(FIFO_PATH):删除文件系统中的命名管道。这通常在不需要时进行。
总结
管道有点类似于传小纸条进行收发消息。
3.2 消息队列:结构化消息传递
消息队列允许进程以非阻塞方式发送和接受离散消息。消息可以有类型,这使得接收方可以选择性地接受特定类型的消息。
优点:结构化消息,消息类型过滤,避免了管道的字节流特性,可以处理优先级
缺点:消息大小有限制,需要清理IPC资源。
两种主要类型:System V消息队列(较老)和POSIX消息队列(较新,推荐)
案例:POSIX消息队列
POSIX消息队列使用mq_open,mq_send,mq_receive,mq_close,mq_unlink等函数。
场景:一个进程发送消息,另一个进程接收消息。
mq_sender.cpp
#include <iostream>
#include <mqueue.h> // For mq functions
#include <cstring> // For strlen()
#include <fcntl.h> // For O_CREAT, O_RDWR
#include <sys/stat.h> // For S_IRUSR, S_IWUSR
const char* MQ_NAME = "/my_message_queue";
const int MAX_MSGS = 10;
const int MSG_SIZE = 256;
int main() {
mqd_t mq; // Message queue descriptor
struct mq_attr attr;
// 1. 设置消息队列属性
attr.mq_flags = 0; // Blocking
attr.mq_maxmsg = MAX_MSGS;
attr.mq_msgsize = MSG_SIZE;
attr.mq_curmsgs = 0; // Current messages in queue (ignored for open)
// 2. 打开或创建消息队列
// O_CREAT: 如果不存在则创建。 O_RDWR: 读写权限。 S_IRUSR|S_IWUSR: 用户读写权限
mq = mq_open(MQ_NAME, O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
std::cout << "Message queue opened/created: " << MQ_NAME << std::endl;
const char* messages[] = {
"Hello from sender!",
"This is the second message.",
"Goodbye!"
};
unsigned int priority = 10; // Message priority
// 3. 发送消息
for (int i = 0; i < 3; ++i) {
if (mq_send(mq, messages[i], strlen(messages[i]) + 1, priority) == -1) {
perror("mq_send");
} else {
std::cout << "Sent message: '" << messages[i] << "' with priority " << priority << std::endl;
}
priority--; // Just for demonstration, can be constant
usleep(500000); // 0.5 sec delay
}
// 4. 关闭消息队列
if (mq_close(mq) == -1) {
perror("mq_close");
}
std::cout << "Message queue closed." << std::endl;
// 5. 解链消息队列(可选,通常由接收方或一个清理脚本负责)
// if (mq_unlink(MQ_NAME) == -1) {
// perror("mq_unlink");
// } else {
// std::cout << "Message queue unlinked." << std::endl;
// }
return 0;
}
mq_receiver.cpp
#include <iostream>
#include <mqueue.h> // For mq functions
#include <cstring> // For strlen()
#include <fcntl.h> // For O_RDONLY
#include <sys/stat.h> // For S_IRUSR, S_IWUSR
#include <unistd.h> // For usleep()
const char* MQ_NAME = "/my_message_queue";
const int MSG_SIZE = 256; // Must match sender's msgsize
int main() {
mqd_t mq;
char buffer[MSG_SIZE];
unsigned int priority;
// 1. 打开消息队列
mq = mq_open(MQ_NAME, O_RDONLY); // 必须以只读模式打开
if (mq == (mqd_t)-1) {
perror("mq_open");
return 1;
}
std::cout << "Message queue opened: " << MQ_NAME << std::endl;
// 2. 获取消息队列属性(可选,用于确认消息大小等)
struct mq_attr attr;
if (mq_getattr(mq, &attr) == -1) {
perror("mq_getattr");
mq_close(mq);
mq_unlink(MQ_NAME); // 尝试清理
return 1;
}
std::cout << "Queue maxmsg: " << attr.mq_maxmsg << ", msgsize: " << attr.mq_msgsize << std::endl;
// 3. 循环接收消息
std::cout << "Waiting for messages..." << std::endl;
while (true) {
ssize_t bytes_read = mq_receive(mq, buffer, attr.mq_msgsize, &priority);
if (bytes_read == -1) {
if (errno == EAGAIN) { // No message available in non-blocking mode
// In blocking mode, this only happens if queue was made non-blocking
// or if an interrupt occurred. For blocking mode, it will just wait.
// For demonstration, we'll break on error or timeout if that was configured.
std::cout << "No messages currently." << std::endl;
usleep(100000); // Wait a bit before trying again
continue;
}
perror("mq_receive");
break; // Exit on other errors
} else if (bytes_read == 0) {
// This case might not be reached for POSIX MQ like pipes if queue is empty
// and no more sender. mq_receive blocks.
std::cout << "Received 0 bytes, possibly queue shutdown or error." << std::endl;
break;
} else {
buffer[bytes_read] = '\0'; // Null-terminate
std::cout << "Received message: '" << buffer << "' (Priority: " << priority << ")" << std::endl;
}
if (strcmp(buffer, "Goodbye!") == 0) {
std::cout << "Exiting on 'Goodbye!' message." << std::endl;
break;
}
}
// 4. 关闭消息队列
if (mq_close(mq) == -1) {
perror("mq_close");
}
std::cout << "Message queue closed." << std::endl;
// 5. 解链消息队列 (清理)
if (mq_unlink(MQ_NAME) == -1) {
perror("mq_unlink");
} else {
std::cout << "Message queue unlinked." << std::endl;
}
return 0;
}
编译和运行:(需要链接实时库lrt)
g++ mq_sender.cpp -o sender -lrt
g++ mq_receiver.cpp -o receiver -lrt
# 独立启动
# 终端1:
./receiver
# 终端2:
./sender
运行结果:
终端1(receiver):
Message queue opened: /my_message_queue
Queue maxmsg: 10, msgsize: 256
Waiting for messages...
Received message: 'Hello from sender!' (Priority: 10)
Received message: 'This is the second message.' (Priority: 9)
Received message: 'Goodbye!' (Priority: 8)
Exiting on 'Goodbye!' message.
Message queue closed.
Message queue unlinked.
终端2(sender):
Message queue opened/created: /my_message_queue
Sent message: 'Hello from sender!' with priority 10
Sent message: 'This is the second message.' with priority 9
Sent message: 'Goodbye!' with priority 8
Message queue closed.
代码解析:
mq_open():打开或创建消息队列。O_CREAT表示如果不存在则创建,attr参数用于设置队列的特性,如mq_maxmsg(最大消息数)和mq_msgsize(最大消息大小)。
mq_send():发送消息。第三个参数是消息内容,第四个参数是消息的优先级。
mq_receive():接收消息。消息按优先级从高到低出队。
mq_close():关闭消息队列。
mq_unlink():从系统上删除消息队列。
3.3共享内存:最快速的IPC
共享内存是最高效的IPC机制,因为它允许不同进程直接访问同一块物理内存。一旦内存映射完成,进程间就不再需要系统调用来传递数据,直接读写内存即可。
优点:速度最快,无需数据拷贝。
缺点:
- 同步问题:多个进程同时访问共享内存区域时,需要严格的同步机制(如互斥锁、信号量)来避免数据竞争和不一致。
- 管理复杂:需要手动管理共享内存的创建、映射、解除映射和销毁。
- 生命周期:共享内存可以独立于创建它的进程而存在。
两种主要类型:System V共享内存(较老)和POSIX共享内存(较新,推荐)
案例:POSIX共享内存
POSIX共享内存使用/dev/shm(通常是临时的文件系统)作为后端。
场景:一个进程写入数据到共享内存,另一个进程从共享内存读取数据。
shm_writer.cpp
#include <iostream>
#include <sys/mman.h> // For shm_open, mmap, munmap, shm_unlink
#include <fcntl.h> // For O_CREAT, O_RDWR
#include <unistd.h> // For ftruncate
#include <cstring> // For strcpy, strlen
#include <semaphore.h> // For sem_t, sem_open, sem_post, sem_wait, sem_unlink
const char* SHM_NAME = "/my_shared_memory";
const char* SEM_NAME = "/my_sem";
const int SHM_SIZE = 1024;
int main() {
int shm_fd;
char* shm_ptr;
sem_t* sem;
// 1. 创建/打开共享内存对象
shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
if (shm_fd == -1) {
perror("shm_open");
return 1;
}
std::cout << "Shared memory object opened/created: " << SHM_NAME << std::endl;
// 2. 设置共享内存大小
if (ftruncate(shm_fd, SHM_SIZE) == -1) {
perror("ftruncate");
shm_unlink(SHM_NAME);
return 1;
}
std::cout << "Shared memory size set to " << SHM_SIZE << " bytes." << std::endl;
// 3. 将共享内存映射到进程地址空间
shm_ptr = (char*)mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
perror("mmap");
shm_unlink(SHM_NAME);
return 1;
}
std::cout << "Shared memory mapped at address: " << (void*)shm_ptr << std::endl;
// 4. 打开/创建信号量用于同步
// 1表示信号量初始值,供一个进程使用
sem = sem_open(SEM_NAME, O_CREAT | O_RDWR, 0666, 0); // Initial value 0, means consumer must wait
if (sem == SEM_FAILED) {
perror("sem_open");
munmap(shm_ptr, SHM_SIZE);
shm_unlink(SHM_NAME);
return 1;
}
std::cout << "Semaphore opened/created: " << SEM_NAME << std::endl;
// 5. 写入数据到共享内存
const char* message = "Hello from shared memory writer!";
strncpy(shm_ptr, message, SHM_SIZE);
shm_ptr[SHM_SIZE - 1] = '\0'; // Ensure null termination
std::cout << "Wrote message to shared memory: '" << shm_ptr << "'" << std::endl;
// 6. 发布信号量,通知读取者数据已准备好
if (sem_post(sem) == -1) {
perror("sem_post");
}
std::cout << "Semaphore posted. Reader can now access data." << std::endl;
// 7. 解除内存映射
if (munmap(shm_ptr, SHM_SIZE) == -1) {
perror("munmap");
}
std::cout << "Shared memory unmapped." << std::endl;
// 8. 关闭共享内存文件描述符
close(shm_fd);
std::cout << "Shared memory fd closed." << std::endl;
// 9. 关闭信号量 (不删除)
if (sem_close(sem) == -1) {
perror("sem_close");
}
std::cout << "Semaphore closed (not unlinked)." << std::endl;
// 一般由接收方或一个专门的清理脚本负责 unlink
// shm_unlink(SHM_NAME);
// sem_unlink(SEM_NAME);
return 0;
}
shm_reader.cpp
#include <iostream>
#include <sys/mman.h> // For shm_open, mmap, munmap, shm_unlink
#include <fcntl.h> // For O_RDWR
#include <unistd.h> // For close
#include <cstring> // For strlen
#include <semaphore.h> // For sem_t, sem_open, sem_post, sem_wait, sem_unlink
const char* SHM_NAME = "/my_shared_memory";
const char* SEM_NAME = "/my_sem";
const int SHM_SIZE = 1024;
int main() {
int shm_fd;
char* shm_ptr;
sem_t* sem;
// 1. 打开共享内存对象
shm_fd = shm_open(SHM_NAME, O_RDWR, 0666); // 确保读写权限
if (shm_fd == -1) {
perror("shm_open");
return 1;
}
std::cout << "Shared memory object opened: " << SHM_NAME << std::endl;
// 2. 将共享内存映射到进程地址空间
shm_ptr = (char*)mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (shm_ptr == MAP_FAILED) {
perror("mmap");
close(shm_fd);
shm_unlink(SHM_NAME); // 失败时清理
return 1;
}
std::cout << "Shared memory mapped at address: " << (void*)shm_ptr << std::endl;
// 3. 打开信号量
sem = sem_open(SEM_NAME, O_RDWR);
if (sem == SEM_FAILED) {
perror("sem_open");
munmap(shm_ptr, SHM_SIZE);
close(shm_fd);
shm_unlink(SHM_NAME); // 失败时清理
return 1;
}
std::cout << "Semaphore opened: " << SEM_NAME << std::endl;
// 4. 等待信号量,直到写入者写入数据
std::cout << "Waiting for semaphore (data ready)..." << std::endl;
if (sem_wait(sem) == -1) {
perror("sem_wait");
// Clean up partially created resources
munmap(shm_ptr, SHM_SIZE);
close(shm_fd);
shm_unlink(SHM_NAME);
sem_close(sem);
sem_unlink(SEM_NAME); // 信号量也要清理
return 1;
}
std::cout << "Semaphore acquired. Reading data..." << std::endl;
// 5. 从共享内存读取数据
std::cout << "Read from shared memory: '" << shm_ptr << "'" << std::endl;
// 6. 解除内存映射
if (munmap(shm_ptr, SHM_SIZE) == -1) {
perror("munmap");
}
std::cout << "Shared memory unmapped." << std::endl;
// 7. 关闭共享内存文件描述符
close(shm_fd);
std::cout << "Shared memory fd closed." << std::endl;
// 8. 关闭信号量
if (sem_close(sem) == -1) {
perror("sem_close");
}
std::cout << "Semaphore closed." << std::endl;
// 9. 解链共享内存和信号量
if (shm_unlink(SHM_NAME) == -1) {
perror("shm_unlink");
} else {
std::cout << "Shared memory unlinked: " << SHM_NAME << std::endl;
}
if (sem_unlink(SEM_NAME) == -1) {
perror("sem_unlink");
} else {
std::cout << "Semaphore unlinked: " << SEM_NAME << std::endl;
}
return 0;
}
编译和运行:(需要链接实时库lrt)
g++ shm_writer.cpp -o writer -lrt
g++ shm_reader.cpp -o reader -lrt
# 独立启动
# 终端1: 启动reader,它会阻塞直到writer写入
./reader
# 终端2: 启动writer
./writer
运行结果:
终端1(reader):
Shared memory object opened: /my_shared_memory
Shared memory mapped at address: 0x7fa2c40c3000
Semaphore opened: /my_sem
Waiting for semaphore (data ready)...
Semaphore acquired. Reading data...
Read from shared memory: 'Hello from shared memory writer!'
Shared memory unmapped.
Shared memory fd closed.
Semaphore closed.
Shared memory unlinked: /my_shared_memory
Semaphore unlinked: /my_sem
终端2(writer):
Shared memory object opened/created: /my_shared_memory
Shared memory size set to 1024 bytes.
Shared memory mapped at address: 0x7f09f9872000
Semaphore opened/created: /my_sem
Wrote message to shared memory: 'Hello from shared memory writer!'
Semaphore posted. Reader can now access data.
Shared memory unmapped.
Shared memory fd closed.
Semaphore closed (not unlinked).
代码解析:
shm_open():打开或创建共享内存对象。它返回一个文件描述符。
ftruncate():设置共享内存对象大小。这是必要的,因为shm_open只创建了元数据,没有分配实际空间。
mmap():将共享内存文件描述符映射到进程的虚拟地址空间。MAP_SHARED表示多个进程可以共享这个映射。
munmap():接触内存映射。
shm_unlink():从系统中删除共享内存对象。此时,即使有进程还在映射,这个对象也会被标记为删除,当所有映射都解除后,物理内存才会被释放。
同步机制(命名信号量):
sem_open():创建或打开一个命名信号量。
sem_wait():P操作,信号量减1。如果信号量值为0,调用进程会阻塞。
sem_post():V操作,信号量加1.
sem_close():关闭信号量。
sem_unlink():从系统中删除命名信号量。
在这个例子中,信号量控制了读写顺序,写入着sem_post后,读取者才能sem_wait并读取。初始值为0,确保读取者先阻塞。
3.4 信号量:进程同步利器
信号量不仅仅是共享内存的辅助,它们本身也是一种重要的IPC机制,主要用于进程间的同步和互斥。
信号量是一个非负整数,由wait(P操作,减1)和post(V操作,加1)两个原子操作来控制。
二值信号量:值只有0和1,可作为互斥锁使用。
计数信号量:值可以大于1,用于管理有限的资源。
上面的共享内存的例子中已经展示了命名信号量的用法。
优点:灵活的同步机制,可以控制资源访问,支持生产者-消费者模型
缺点:容易死锁,需要细心管理。
3.5信号:简单的异步通知
信号是最低级的IPC机制,用于向进程发送异步通知。它不是用于传输大量数据,而是用于通知事件的发生,比如SIGINT(Ctrl+C),SIGTERM(终止进程),SIGCHLD(子进程状态改变)。
优点:简单、异步、操作系统级别支持
缺点:只能传递少量信息(通常只有信号编号),处理复杂(需要注册信号处理函数),不适合常规数据通信。
案例:信号(kill(),signal())
场景:一个进程发送信号通知另一个进程某个事件发生。
signal_sender.cpp
#include <iostream>
#include <signal.h> // For kill()
#include <unistd.h> // For sleep()
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: " << argv[0] << " <pid_of_receiver>" << std::endl;
return 1;
}
pid_t receiver_pid = std::stoi(argv[1]); // Convert string to PID
std::cout << "Sender: Sending SIGUSR1 to PID " << receiver_pid << " in 2 seconds..." << std::endl;
sleep(2); // Give receiver time to set up
// 发送用户自定义信号1 (SIGUSR1)
if (kill(receiver_pid, SIGUSR1) == -1) {
perror("kill");
return 1;
}
std::cout << "Sender: SIGUSR1 sent successfully." << std::endl;
std::cout << "Sender: Sending SIGTERM to PID " << receiver_pid << " in 3 seconds to terminate..." << std::endl;
sleep(3);
// 发送终止信号 (SIGTERM)
if (kill(receiver_pid, SIGTERM) == -1) {
perror("kill");
return 1;
}
std::cout << "Sender: SIGTERM sent successfully. Receiver should terminate." << std::endl;
return 0;
}
signal_receiver.cpp
#include <iostream>
#include <signal.h> // For signal(), sigaction()
#include <unistd.h> // For getpid(), pause()
#include <atomic> // For atomic<bool>
// 使用原子变量来安全地通知主循环退出
std::atomic<bool> keep_running(true);
// 信号处理函数
void signal_handler(int signum) {
if (signum == SIGUSR1) {
std::cout << "\nReceiver: Received SIGUSR1 (User-defined signal 1)!" << std::endl;
// 可以在这里执行一些轻量级操作
} else if (signum == SIGTERM) {
std::cout << "\nReceiver: Received SIGTERM (Termination signal)! Preparing to exit." << std::endl;
keep_running = false; // 通知主循环退出
} else {
std::cout << "\nReceiver: Received unexpected signal: " << signum << std::endl;
}
}
int main() {
std::cout << "Receiver PID: " << getpid() << std::endl;
// 注册信号处理函数
// 使用sigaction()更强大和推荐,因为它允许更多控制(如sa_flags)
struct sigaction sa;
sa.sa_handler = signal_handler; // 设置信号处理函数
sigemptyset(&sa.sa_mask); // 在信号处理函数执行期间不阻塞其他信号
sa.sa_flags = 0; // 0表示默认行为
if (sigaction(SIGUSR1, &sa, NULL) == -1) {
perror("sigaction SIGUSR1");
return 1;
}
if (sigaction(SIGTERM, &sa, NULL) == -1) {
perror("sigaction SIGTERM");
return 1;
}
std::cout << "Receiver: Waiting for signals... (Looping)" << std::endl;
// 主循环,持续运行直到被信号处理函数改变keep_running
while (keep_running) {
// 在这里可以执行一些其他操作,或者仅仅是等待
std::cout << "Receiver: Working... (PID: " << getpid() << ")" << std::endl;
sleep(1); // 模拟工作
}
std::cout << "Receiver: Exiting gracefully." << std::endl;
return 0;
}
编译和运行
g++ signal_sender.cpp -o sender
g++ signal_receiver.cpp -o receiver
# 终端1: 启动receiver
./receiver
# 终端2: 查看receiver的PID,然后用sender发送信号
# ps aux | grep receiver # 找到PID
# ./sender <receiver_pid>
运行结果:
终端1(receiver):
Receiver PID: 12345 (实际PID)
Receiver: Waiting for signals... (Looping)
Receiver: Working... (PID: 12345)
Receiver: Working... (PID: 12345)
Receiver: Received SIGUSR1 (User-defined signal 1)!
Receiver: Working... (PID: 12345)
Receiver: Working... (PID: 12345)
Receiver: Received SIGTERM (Termination signal)! Preparing to exit.
Receiver: Exiting gracefully.
终端2(sender):
Sender: Sending SIGUSR1 to PID 12345 in 2 seconds...
Sender: SIGUSR1 sent successfully.
Sender: Sending SIGTERM to PID 12345 in 3 seconds to terminate...
Sender: SIGTERM sent successfully. Receiver should terminate.
代码解析:
getpid():获取当前进程的PID
kill(pid,signum):向指定PID的进程发送指定信号。
sigaction(signum,&sa,NULL):注册信号处理函数。sa.sa_handler是我们定义的函数,它将在接收到信号时被调用。
信号处理函数注意事项:信号处理函数应该是“异步信号安全”的。这意味着在函数内部,只应调用那些在信号处理程序中安全的函数(通常是可重入的,如write,_exit,sleep等)。不应进行复杂的内存分配、IO操作或非原子操作,因为它们可能不是原子的,或者在信号处理函数的执行期间中断了主程序的执行,导致了不可预测的行为。这里使用std::atomic<bool>是安全的。
pause():使进程挂起,直到接收到信号。在我们的例子中,主循环用sleep(1)和keep_running来演示。
3.6 套接字(Sockets):网络通信的基石
套接字使IPC中最通用、最强大的机制,因为它不仅适用于同一台机器上的进程,也适用于网络中不同及其上的进程。
- Unix域套接字:仅用于同一台机器上的进程通信,但比TCP/IP套接字效率更高,因为它们绕过了网络协议栈。
- TCP/IP套接字:跨机器通信,网络通信的基石。
优点:极其通用,支持网络透明,面向连接和无连接通信。
缺点:相对复杂。涉及网络协议栈,性能不如共享内存快。
案例:Unix域套接字(AF_UNIX)
UDS在文件系统中由对应的路径,像文件一样。
场景:一个进程作为服务器监听Unix域套接字,另一个进程作为客户端连接并发送消息。
uds_server.cpp
#include <iostream>
#include <sys/socket.h> // For socket(), bind(), listen(), accept()
#include <sys/un.h> // For sockaddr_un
#include <unistd.h> // For close(), unlink()
#include <cstring> // For memset(), strlen()
const char* SOCKET_PATH = "/tmp/my_uds_socket";
const int BUFFER_SIZE = 256;
int main() {
int server_fd, client_fd;
struct sockaddr_un server_addr, client_addr;
socklen_t client_len;
char buffer[BUFFER_SIZE];
// 1. 创建套接字
server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (server_fd == -1) {
perror("socket");
return 1;
}
std::cout << "Server: Socket created." << std::endl;
// 2. 配置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sun_family = AF_UNIX;
strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);
// 3. 移除旧的套接字文件(如果存在)
unlink(SOCKET_PATH); // Crucial for UDS servers
// 4. 绑定套接字到文件路径
if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("bind");
close(server_fd);
return 1;
}
std::cout << "Server: Bound to " << SOCKET_PATH << std::endl;
// 5. 监听连接
if (listen(server_fd, 5) == -1) { // Max 5 pending connections
perror("listen");
close(server_fd);
unlink(SOCKET_PATH);
return 1;
}
std::cout << "Server: Listening for connections..." << std::endl;
// 6. 接受客户端连接
client_len = sizeof(client_addr);
client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd == -1) {
perror("accept");
close(server_fd);
unlink(SOCKET_PATH);
return 1;
}
std::cout << "Server: Accepted client connection." << std::endl;
// 7. 从客户端接收数据
ssize_t bytes_read = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);
if (bytes_read == -1) {
perror("recv");
} else if (bytes_read == 0) {
std::cout << "Server: Client disconnected." << std::endl;
} else {
buffer[bytes_read] = '\0'; // Null-terminate
std::cout << "Server: Received: '" << buffer << "'" << std::endl;
// 8. 向客户端发送响应
const char* response_msg = "Message received, server says hello back!";
if (send(client_fd, response_msg, strlen(response_msg) + 1, 0) == -1) {
perror("send");
} else {
std::cout << "Server: Sent response: '" << response_msg << "'" << std::endl;
}
}
// 9. 关闭套接字
close(client_fd);
std::cout << "Server: Client socket closed." << std::endl;
close(server_fd);
std::cout << "Server: Server socket closed." << std::endl;
// 10. 删除套接字文件
if (unlink(SOCKET_PATH) == -1) {
perror("unlink socket file");
} else {
std::cout << "Server: Socket file " << SOCKET_PATH << " removed." << std::endl;
}
return 0;
}
uds_client.cpp
#include <iostream>
#include <sys/socket.h> // For socket(), connect()
#include <sys/un.h> // For sockaddr_un
#include <unistd.h> // For close()
#include <cstring> // For memset(), strlen()
const char* SOCKET_PATH = "/tmp/my_uds_socket";
const int BUFFER_SIZE = 256;
int main() {
int client_fd;
struct sockaddr_un server_addr;
char buffer[BUFFER_SIZE];
// 1. 创建套接字
client_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (client_fd == -1) {
perror("socket");
return 1;
}
std::cout << "Client: Socket created." << std::endl;
// 2. 配置服务器地址
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sun_family = AF_UNIX;
strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);
// 3. 连接到服务器
std::cout << "Client: Attempting to connect to " << SOCKET_PATH << std::endl;
if (connect(client_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
perror("connect");
close(client_fd);
return 1;
}
std::cout << "Client: Connected to server." << std::endl;
// 4. 向服务器发送数据
const char* message = "Hello from Unix Domain Socket client!";
if (send(client_fd, message, strlen(message) + 1, 0) == -1) {
perror("send");
} else {
std::cout << "Client: Sent: '" << message << "'" << std::endl;
}
// 5. 从服务器接收响应
ssize_t bytes_read = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);
if (bytes_read == -1) {
perror("recv");
} else if (bytes_read == 0) {
std::cout << "Client: Server disconnected." << std::endl;
} else {
buffer[bytes_read] = '\0'; // Null-terminate
std::cout << "Client: Received response: '" << buffer << "'" << std::endl;
}
// 6. 关闭套接字
close(client_fd);
std::cout << "Client: Socket closed." << std::endl;
return 0;
}
编译运行:
g++ uds_server.cpp -o server
g++ uds_client.cpp -o client
# 独立启动
# 终端1: 启动server
./server
# 终端2: 启动client
./client
运行结果:
终端1(server):
Server: Socket created.
Server: Bound to /tmp/my_uds_socket
Server: Listening for connections...
Server: Accepted client connection.
Server: Received: 'Hello from Unix Domain Socket client!'
Server: Sent response: 'Message received, server says hello back!'
Server: Client socket closed.
Server: Server socket closed.
Server: Socket file /tmp/my_uds_socket removed.
终端2(client):
Client: Socket created.
Client: Attempting to connect to /tmp/my_uds_socket
Client: Connected to server.
Client: Sent: 'Hello from Unix Domain Socket client!'
Client: Received response: 'Message received, server says hello back!'
Client: Socket closed.
代码解析:
socket(AF_UNIX,SOCK_STREAM,0):创建Unix域套接字。AF_UNIX指定地址族为Unix域,SOCK_STREAM指定套接字类型为流式(TCP)
struct sockaddr_un:Unix域套接字地址结构,其中sun_path字段存放套接字文件路径。
unlink(SOCKET_PATH):在绑定前,通常需要删除旧的套接字文件,以防止上次程序异常终止未清除。
bind():将套接字文件路径与套接字描述符关联起来。
listen():使服务器套接字进入监听状态。
accept():阻塞式等待客户端连接并接收。成功后返回一个新的套接字描述符用于客户端通信。
connect():客户端连接到指定路劲的服务器套接字。
send()/recv():通过已连接的套接字发送和接收数据。
close():关闭套接字描述符。
unlink(SOCKET_PATH):服务器在退出时,负责删除创建的套接字文件。
4.IPC选择指南
选择合适的IPC机制取决于你的具体需求:
亲缘关系进程间少量数据通信且简单:匿名管道
任意进程间少量数据通信但需要文件路径:命名管道。
结构化消息传递,消息有优先级,需要异步通信:消息队列。
大数据量、高效率通信,且能处理同步问题:共享内存(配合信号量或互斥锁)
进程间简单事件通知或控制:信号。
通用通信,跨机器通信,或复杂的客户端-服务的模式:套接字(Unix域套接字更高效,TCP/IP套接字最通用)。
5.总结
IPC是C++系统编程中一个强大而复杂的领域。掌握这些机制将大大提升你的设计和实现高并发、高可用、模块化系统的能力。从简单的管道到高效的共享内存,再到灵活的套接字,每种机制都有独特的应用场景和优缺点。理解它们的基本原理,并结合实际案例进行练习,你将能够游刃有余地在C++中构建出健壮的多进程应用程序。
希望这篇博客对你的C++ IPC学习之旅有所帮助!动手尝试这些代码,才能真正掌握这些知识。
注解
1.上下文切换开销大是什么意思?
上下文切换开销大意味着操作系统在多进程间切换时,用于保存当前进程状态、加载下一个进程状态、更新内存管理信息(特别是TLB和页表)、处理CPU缓存未命中等操作所消耗的CPU时间和资源较多。这种开销会降低系统的整体吞吐量,尤其是在进程切换非常频繁的场景下。因此,在设计并发程序时,如果任务间需要高度共享数据且对切换延迟敏感,可能会优先考虑多线程;如果需要强隔离性、独立性或利用多核并行处理不相关的任务,则多进程时合适的选择,但需要意识到其切换开销。
2.啥是原子操作?
原子操作指的是在多线程或多进程环境中,一个或一系列操作妖魔完全执行,要么完全不执行,并且在执行过程中不会被其他线程/进程打断。它就像一个不可分割的最小单元,从其他线程/进程的角度看,这个操作是瞬间完成的。