揭秘C++ IPC:多进程协作的艺术与实践

在现代软件开发中,仅仅依靠单线程程序已经难以满足日益复杂的需求。尤其是在需要高并发、高可用性或者需要在隔离不同功能模块的场景下,多进程编程(也就是我们今天要深入探讨的IPC — intert-Process-Communication,进程间通信)成为了必不可少的技能。

本文将带领你深入理解C++中的IPC机制,从理论到实践,从简单到复杂,通过详细的案例代码,让你彻底掌握多进程协作的艺术。

1.为什么需要IPC?理解多进程的诱惑

我们先来思考一下,为什么不直接使用多线程,而要引入更复杂的IPC呢?

  • 资源隔离与稳定性:每个进程都有独立的地址空间。一个进程崩溃通常不会影响到其他进程,这大大提高了系统 的健壮性。例如,浏览器中每个标签页通常是一个独立的进程。
  • 安全性:不同进程之间的数据默认是隔离的,这使得数据访问更加可控,降低了恶意攻击的风险。
  • 利用多核CPU: 真正的并行执行,不同进程可以在不同的CPU核心上同时运行不同的指令序列,充分利用现代多核处理器的性能。
  • 模块化与解耦:可以将大型系统拆分为多个独立的、职责单一的进程,降低系统复杂度,便于维护和升级。
  • 继承性操作:在Linux/Unix系统中,fork()创建的子进程会继承父进程的大部分资源,这为某些特定的编程模式提供了便利。

当然,与多线程相比,多进程的开销(创建,销毁,上下文切换(看注解))通常更大,进程间通信也更复杂。因此,选择多线程还是多进程,取决于你的具体应用场景和权衡。

2.IPC的基石:共享数据与消息传递

无论采用何种IPC方式,其核心思想无非两种:

1.共享数据:多个进程访问同一块物理内存或文件

2.消息传递:进程之间通过发送和接收消息来交换信息

接下来,我们将围绕这两种核心思想,介绍C++中常用的IPC机制。

3.C++IPC实践:深入各种机制

C++本身不像Java有高级的并发库,其IPC能力通常依赖于底层操作系统提供的API。我们将主要聚焦于Linux/Unix环境下的IPC机制,因为它们是C++IPC中最常用、最灵活的。

3.1 管道:最简单的双向通信

管道是最早、最简单的IPC形式。他分为两种:

  1. 匿名管道:只能用于具有亲缘关系的进程(父子、兄弟),因为它依赖于文件描述符的继承。
  2. 命名管道:可以用于任意两个进程,即使它们之间没有亲缘关系。它在文件系统中有一个对应的路径。

优点:简单易用

缺点:效率不高(数据需要拷贝),半双工(通常),匿名管道限制多。

案例:匿名管道

匿名管道通过pipe()函数创建一对文件描述符:一个用于读取,一个用于写入。

场景:父进程发送数据给子进程,子进程读取并打印。

#include <iostream>
#include <unistd.h> // For pipe(), fork(), read(), write(), close()
#include <sys/wait.h> // For wait()
#include <string>
#include <cstring> // For strlen()

int main() {
   int pipe_fd[2]; // pipe_fd[0] for read, pipe_fd[1] for write
   pid_t pid;
   const int BUFFER_SIZE = 256;
   char buffer[BUFFER_SIZE];
   const char* message = "Hello from parent process!";

   // 1. 创建管道
   if (pipe(pipe_fd) == -1) {
       perror("pipe");
       return 1;
  }

   // 2. 创建子进程
   pid = fork();

   if (pid == -1) {
       perror("fork");
       return 1;
  }

   if (pid == 0) { // 子进程
       // 关闭写入端,因为子进程只从管道读取
       close(pipe_fd[1]);

       std::cout << "Child process: Waiting for data..." << std::endl;
       // 3. 子进程从管道读取数据
       ssize_t bytes_read = read(pipe_fd[0], buffer, BUFFER_SIZE - 1);
       if (bytes_read > 0) {
           buffer[bytes_read] = '\0'; // Null-terminate the string
           std::cout << "Child process received: '" << buffer << "'" << std::endl;
      } else if (bytes_read == 0) {
           std::cout << "Child process: Parent closed pipe." << std::endl;
      } else {
           perror("read");
      }

       // 关闭读取端
       close(pipe_fd[0]);
       exit(0); // 子进程退出
  } else { // 父进程
       // 关闭读取端,因为父进程只向管道写入
       close(pipe_fd[0]);

       std::cout << "Parent process: Sending data..." << std::endl;
       // 3. 父进程向管道写入数据
       if (write(pipe_fd[1], message, strlen(message)) == -1) {
           perror("write");
      }
       std::cout << "Parent process: Data sent." << std::endl;

       // 关闭写入端,发送EOF信号给子进程
       close(pipe_fd[1]);

       // 等待子进程结束
       wait(NULL);
       std::cout << "Parent process: Child process finished." << std::endl;
  }

   return 0;
}

运行结果:

Parent process: Sending data...
Parent process: Data sent.
Child process: Waiting for data...
Child process received: 'Hello from parent process!'
Parent process: Child process finished.

代码解析:

pipe(pipe_fd):创建管道。pipe_fd[0]是读端,pipe_fd[1]是写端。

fork():创建子进程。

关键点:父子进程都需要关闭它们不使用的那一端管道。

子进程关闭pipe_fd[1](写端),因为它只读。

父进程关闭pipe_fd[0](读端),因为它只写。

read()/write():像操作文件一样读写管道。

当管道的写入端全部关闭时,read()会返回0,表示文件结束符(EOF)。

案例:命名管道

命名管道(FIFO)通过文件系统路径来识别。

场景:一个进程作为写入者,另一个进程作为读取者。可以先启动读取者,再启动写入者。

fifo_writer.cpp

#include <iostream>
#include <fcntl.h> // For O_WRONLY
#include <sys/stat.h> // For S_IRUSR, S_IWUSR
#include <unistd.h> // For write()
#include <cstring> // For strlen()

const char* FIFO_PATH = "/tmp/my_fifo";

int main() {
   // 1. 创建命名管道
   // 如果fifo不存在,则创建。0666表示读写权限
   if (mkfifo(FIFO_PATH, 0666) == -1) {
       if (errno != EEXIST) { // 如果错误不是因为文件已存在
           perror("mkfifo");
           return 1;
      }
       std::cout << "FIFO already exists: " << FIFO_PATH << std::endl;
  } else {
       std::cout << "FIFO created successfully: " << FIFO_PATH << std::endl;
  }

   // 2. 打开命名管道用于写入
   int fd = open(FIFO_PATH, O_WRONLY);
   if (fd == -1) {
       perror("open for write");
       return 1;
  }
   std::cout << "Opened FIFO for writing." << std::endl;

   const char* message = "Hello from FIFO writer!";
   // 3. 写入数据
   if (write(fd, message, strlen(message) + 1) == -1) { // +1 for null terminator
       perror("write");
  } else {
       std::cout << "Message sent: '" << message << "'" << std::endl;
  }

   // 4. 关闭管道
   close(fd);
   std::cout << "Closed FIFO." << std::endl;

   // 清理:删除FIFO文件(可选,但推荐在不再使用时清理)
   // unlink(FIFO_PATH); // 通常在reader或一个专门的清理脚本中执行
   return 0;
}

fifo_reader.cpp

#include <iostream>
#include <fcntl.h> // For O_RDONLY
#include <sys/stat.h> // For S_IRUSR, S_IWUSR
#include <unistd.h> // For read()
#include <cstdio> // For remove()

const char* FIFO_PATH = "/tmp/my_fifo";
const int BUFFER_SIZE = 256;
char buffer[BUFFER_SIZE];

int main() {
   // 1. 打开命名管道用于读取
   // 注意:如果mkfifo被writer创建,reader可以直接通过open打开。
   // 如果reader先启动,且fifo不存在,则会失败,需要reader也先尝试mkfifo。
   // 这里我们假设writer会先确保fifo创建。
   int fd = open(FIFO_PATH, O_RDONLY);
   if (fd == -1) {
       perror("open for read");
       return 1;
  }
   std::cout << "Opened FIFO for reading. Waiting for data..." << std::endl;

   // 2. 从管道读取数据
   ssize_t bytes_read = read(fd, buffer, BUFFER_SIZE - 1);
   if (bytes_read > 0) {
       buffer[bytes_read] = '\0'; // Null-terminate
       std::cout << "Message received: '" << buffer << "'" << std::endl;
  } else if (bytes_read == 0) {
       std::cout << "End of file (writer closed FIFO)." << std::endl;
  } else {
       perror("read");
  }

   // 3. 关闭管道
   close(fd);
   std::cout << "Closed FIFO." << std::endl;

   // 4. 清理:删除FIFO文件(通常由其中一个进程负责,或者一个启动/关闭脚本)
   if (remove(FIFO_PATH) == -1) {
       perror("remove fifo");
  } else {
       std::cout << "FIFO removed: " << FIFO_PATH << std::endl;
  }

   return 0;
}

编译和运行:

g++ fifo_writer.cpp -o writer
g++ fifo_reader.cpp -o reader

# 打开两个终端
# 终端1: 启动reader,它会阻塞直到writer写入
./reader

# 终端2: 启动writer
./writer

运行结果:

终端一(reader):

Opened FIFO for reading. Waiting for data...
Message received: 'Hello from FIFO writer!'
Closed FIFO.
FIFO removed: /tmp/my_fifo

终端2(writer):

FIFO created successfully: /tmp/my_fifo  # 或者 FIFO already exists...
Opened FIFO for writing.
Message sent: 'Hello from FIFO writer!'
Closed FIFO.

代码解析:

mkfifo(FIFO_PATH,0666):在FIFO_PATH创建命名管道文件。注意0666是文件权限。如果管道文件不存在,mkfifo会创建它;如果已存在,则什么也不做(如果errno是EEXIST)。

open(FIFO_PATH,0_WRONLY)/open(FIFO_PATH,0_RDONLY):通过路径打开命名管道,像常规文件一样进行读写。

remove(FIFO_PATH):删除文件系统中的命名管道。这通常在不需要时进行。

总结

管道有点类似于传小纸条进行收发消息。

3.2 消息队列:结构化消息传递

消息队列允许进程以非阻塞方式发送和接受离散消息。消息可以有类型,这使得接收方可以选择性地接受特定类型的消息。

优点:结构化消息,消息类型过滤,避免了管道的字节流特性,可以处理优先级

缺点:消息大小有限制,需要清理IPC资源。

两种主要类型:System V消息队列(较老)和POSIX消息队列(较新,推荐)

案例:POSIX消息队列

POSIX消息队列使用mq_open,mq_send,mq_receive,mq_close,mq_unlink等函数。

场景:一个进程发送消息,另一个进程接收消息。

mq_sender.cpp

#include <iostream>
#include <mqueue.h> // For mq functions
#include <cstring> // For strlen()
#include <fcntl.h> // For O_CREAT, O_RDWR
#include <sys/stat.h> // For S_IRUSR, S_IWUSR

const char* MQ_NAME = "/my_message_queue";
const int MAX_MSGS = 10;
const int MSG_SIZE = 256;

int main() {
   mqd_t mq; // Message queue descriptor
   struct mq_attr attr;

   // 1. 设置消息队列属性
   attr.mq_flags = 0; // Blocking
   attr.mq_maxmsg = MAX_MSGS;
   attr.mq_msgsize = MSG_SIZE;
   attr.mq_curmsgs = 0; // Current messages in queue (ignored for open)

   // 2. 打开或创建消息队列
   // O_CREAT: 如果不存在则创建。 O_RDWR: 读写权限。 S_IRUSR|S_IWUSR: 用户读写权限
   mq = mq_open(MQ_NAME, O_CREAT | O_RDWR, 0644, &attr);
   if (mq == (mqd_t)-1) {
       perror("mq_open");
       return 1;
  }
   std::cout << "Message queue opened/created: " << MQ_NAME << std::endl;

   const char* messages[] = {
       "Hello from sender!",
       "This is the second message.",
       "Goodbye!"
  };
   unsigned int priority = 10; // Message priority

   // 3. 发送消息
   for (int i = 0; i < 3; ++i) {
       if (mq_send(mq, messages[i], strlen(messages[i]) + 1, priority) == -1) {
           perror("mq_send");
      } else {
           std::cout << "Sent message: '" << messages[i] << "' with priority " << priority << std::endl;
      }
       priority--; // Just for demonstration, can be constant
       usleep(500000); // 0.5 sec delay
  }

   // 4. 关闭消息队列
   if (mq_close(mq) == -1) {
       perror("mq_close");
  }
   std::cout << "Message queue closed." << std::endl;

   // 5. 解链消息队列(可选,通常由接收方或一个清理脚本负责)
   // if (mq_unlink(MQ_NAME) == -1) {
   //     perror("mq_unlink");
   // } else {
   //     std::cout << "Message queue unlinked." << std::endl;
   // }

   return 0;
}

mq_receiver.cpp

#include <iostream>
#include <mqueue.h> // For mq functions
#include <cstring> // For strlen()
#include <fcntl.h> // For O_RDONLY
#include <sys/stat.h> // For S_IRUSR, S_IWUSR
#include <unistd.h> // For usleep()

const char* MQ_NAME = "/my_message_queue";
const int MSG_SIZE = 256; // Must match sender's msgsize

int main() {
   mqd_t mq;
   char buffer[MSG_SIZE];
   unsigned int priority;

   // 1. 打开消息队列
   mq = mq_open(MQ_NAME, O_RDONLY); // 必须以只读模式打开
   if (mq == (mqd_t)-1) {
       perror("mq_open");
       return 1;
  }
   std::cout << "Message queue opened: " << MQ_NAME << std::endl;

   // 2. 获取消息队列属性(可选,用于确认消息大小等)
   struct mq_attr attr;
   if (mq_getattr(mq, &attr) == -1) {
       perror("mq_getattr");
       mq_close(mq);
       mq_unlink(MQ_NAME); // 尝试清理
       return 1;
  }
   std::cout << "Queue maxmsg: " << attr.mq_maxmsg << ", msgsize: " << attr.mq_msgsize << std::endl;

   // 3. 循环接收消息
   std::cout << "Waiting for messages..." << std::endl;
   while (true) {
       ssize_t bytes_read = mq_receive(mq, buffer, attr.mq_msgsize, &priority);
       if (bytes_read == -1) {
           if (errno == EAGAIN) { // No message available in non-blocking mode
               // In blocking mode, this only happens if queue was made non-blocking
               // or if an interrupt occurred. For blocking mode, it will just wait.
               // For demonstration, we'll break on error or timeout if that was configured.
               std::cout << "No messages currently." << std::endl;
               usleep(100000); // Wait a bit before trying again
               continue;
          }
           perror("mq_receive");
           break; // Exit on other errors
      } else if (bytes_read == 0) {
           // This case might not be reached for POSIX MQ like pipes if queue is empty
           // and no more sender. mq_receive blocks.
           std::cout << "Received 0 bytes, possibly queue shutdown or error." << std::endl;
           break;
      } else {
           buffer[bytes_read] = '\0'; // Null-terminate
           std::cout << "Received message: '" << buffer << "' (Priority: " << priority << ")" << std::endl;
      }
       if (strcmp(buffer, "Goodbye!") == 0) {
           std::cout << "Exiting on 'Goodbye!' message." << std::endl;
           break;
      }
  }

   // 4. 关闭消息队列
   if (mq_close(mq) == -1) {
       perror("mq_close");
  }
   std::cout << "Message queue closed." << std::endl;

   // 5. 解链消息队列 (清理)
   if (mq_unlink(MQ_NAME) == -1) {
       perror("mq_unlink");
  } else {
       std::cout << "Message queue unlinked." << std::endl;
  }

   return 0;
}

编译和运行:(需要链接实时库lrt)

g++ mq_sender.cpp -o sender -lrt
g++ mq_receiver.cpp -o receiver -lrt

# 独立启动
# 终端1:
./receiver

# 终端2:
./sender

运行结果:

终端1(receiver):

Message queue opened: /my_message_queue
Queue maxmsg: 10, msgsize: 256
Waiting for messages...
Received message: 'Hello from sender!' (Priority: 10)
Received message: 'This is the second message.' (Priority: 9)
Received message: 'Goodbye!' (Priority: 8)
Exiting on 'Goodbye!' message.
Message queue closed.
Message queue unlinked.

终端2(sender):

Message queue opened/created: /my_message_queue
Sent message: 'Hello from sender!' with priority 10
Sent message: 'This is the second message.' with priority 9
Sent message: 'Goodbye!' with priority 8
Message queue closed.

代码解析:

mq_open():打开或创建消息队列。O_CREAT表示如果不存在则创建,attr参数用于设置队列的特性,如mq_maxmsg(最大消息数)和mq_msgsize(最大消息大小)。

mq_send():发送消息。第三个参数是消息内容,第四个参数是消息的优先级。

mq_receive():接收消息。消息按优先级从高到低出队。

mq_close():关闭消息队列。

mq_unlink():从系统上删除消息队列。

3.3共享内存:最快速的IPC

共享内存是最高效的IPC机制,因为它允许不同进程直接访问同一块物理内存。一旦内存映射完成,进程间就不再需要系统调用来传递数据,直接读写内存即可。

优点:速度最快,无需数据拷贝。

缺点:

  • 同步问题:多个进程同时访问共享内存区域时,需要严格的同步机制(如互斥锁、信号量)来避免数据竞争和不一致。
  • 管理复杂:需要手动管理共享内存的创建、映射、解除映射和销毁。
  • 生命周期:共享内存可以独立于创建它的进程而存在。

两种主要类型:System V共享内存(较老)和POSIX共享内存(较新,推荐)

案例:POSIX共享内存

POSIX共享内存使用/dev/shm(通常是临时的文件系统)作为后端。

场景:一个进程写入数据到共享内存,另一个进程从共享内存读取数据。

shm_writer.cpp

#include <iostream>
#include <sys/mman.h> // For shm_open, mmap, munmap, shm_unlink
#include <fcntl.h> // For O_CREAT, O_RDWR
#include <unistd.h> // For ftruncate
#include <cstring> // For strcpy, strlen
#include <semaphore.h> // For sem_t, sem_open, sem_post, sem_wait, sem_unlink

const char* SHM_NAME = "/my_shared_memory";
const char* SEM_NAME = "/my_sem";
const int SHM_SIZE = 1024;

int main() {
   int shm_fd;
   char* shm_ptr;
   sem_t* sem;

   // 1. 创建/打开共享内存对象
   shm_fd = shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666);
   if (shm_fd == -1) {
       perror("shm_open");
       return 1;
  }
   std::cout << "Shared memory object opened/created: " << SHM_NAME << std::endl;

   // 2. 设置共享内存大小
   if (ftruncate(shm_fd, SHM_SIZE) == -1) {
       perror("ftruncate");
       shm_unlink(SHM_NAME);
       return 1;
  }
   std::cout << "Shared memory size set to " << SHM_SIZE << " bytes." << std::endl;

   // 3. 将共享内存映射到进程地址空间
   shm_ptr = (char*)mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
   if (shm_ptr == MAP_FAILED) {
       perror("mmap");
       shm_unlink(SHM_NAME);
       return 1;
  }
   std::cout << "Shared memory mapped at address: " << (void*)shm_ptr << std::endl;

   // 4. 打开/创建信号量用于同步
   // 1表示信号量初始值,供一个进程使用
   sem = sem_open(SEM_NAME, O_CREAT | O_RDWR, 0666, 0); // Initial value 0, means consumer must wait
   if (sem == SEM_FAILED) {
       perror("sem_open");
       munmap(shm_ptr, SHM_SIZE);
       shm_unlink(SHM_NAME);
       return 1;
  }
   std::cout << "Semaphore opened/created: " << SEM_NAME << std::endl;

   // 5. 写入数据到共享内存
   const char* message = "Hello from shared memory writer!";
   strncpy(shm_ptr, message, SHM_SIZE);
   shm_ptr[SHM_SIZE - 1] = '\0'; // Ensure null termination
   std::cout << "Wrote message to shared memory: '" << shm_ptr << "'" << std::endl;

   // 6. 发布信号量,通知读取者数据已准备好
   if (sem_post(sem) == -1) {
       perror("sem_post");
  }
   std::cout << "Semaphore posted. Reader can now access data." << std::endl;

   // 7. 解除内存映射
   if (munmap(shm_ptr, SHM_SIZE) == -1) {
       perror("munmap");
  }
   std::cout << "Shared memory unmapped." << std::endl;

   // 8. 关闭共享内存文件描述符
   close(shm_fd);
   std::cout << "Shared memory fd closed." << std::endl;

   // 9. 关闭信号量 (不删除)
   if (sem_close(sem) == -1) {
       perror("sem_close");
  }
   std::cout << "Semaphore closed (not unlinked)." << std::endl;

   // 一般由接收方或一个专门的清理脚本负责 unlink
   // shm_unlink(SHM_NAME);
   // sem_unlink(SEM_NAME);

   return 0;
}

shm_reader.cpp

#include <iostream>
#include <sys/mman.h> // For shm_open, mmap, munmap, shm_unlink
#include <fcntl.h> // For O_RDWR
#include <unistd.h> // For close
#include <cstring> // For strlen
#include <semaphore.h> // For sem_t, sem_open, sem_post, sem_wait, sem_unlink

const char* SHM_NAME = "/my_shared_memory";
const char* SEM_NAME = "/my_sem";
const int SHM_SIZE = 1024;

int main() {
   int shm_fd;
   char* shm_ptr;
   sem_t* sem;

   // 1. 打开共享内存对象
   shm_fd = shm_open(SHM_NAME, O_RDWR, 0666); // 确保读写权限
   if (shm_fd == -1) {
       perror("shm_open");
       return 1;
  }
   std::cout << "Shared memory object opened: " << SHM_NAME << std::endl;

   // 2. 将共享内存映射到进程地址空间
   shm_ptr = (char*)mmap(NULL, SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
   if (shm_ptr == MAP_FAILED) {
       perror("mmap");
       close(shm_fd);
       shm_unlink(SHM_NAME); // 失败时清理
       return 1;
  }
   std::cout << "Shared memory mapped at address: " << (void*)shm_ptr << std::endl;

   // 3. 打开信号量
   sem = sem_open(SEM_NAME, O_RDWR);
   if (sem == SEM_FAILED) {
       perror("sem_open");
       munmap(shm_ptr, SHM_SIZE);
       close(shm_fd);
       shm_unlink(SHM_NAME); // 失败时清理
       return 1;
  }
   std::cout << "Semaphore opened: " << SEM_NAME << std::endl;

   // 4. 等待信号量,直到写入者写入数据
   std::cout << "Waiting for semaphore (data ready)..." << std::endl;
   if (sem_wait(sem) == -1) {
       perror("sem_wait");
       // Clean up partially created resources
       munmap(shm_ptr, SHM_SIZE);
       close(shm_fd);
       shm_unlink(SHM_NAME);
       sem_close(sem);
       sem_unlink(SEM_NAME); // 信号量也要清理
       return 1;
  }
   std::cout << "Semaphore acquired. Reading data..." << std::endl;

   // 5. 从共享内存读取数据
   std::cout << "Read from shared memory: '" << shm_ptr << "'" << std::endl;

   // 6. 解除内存映射
   if (munmap(shm_ptr, SHM_SIZE) == -1) {
       perror("munmap");
  }
   std::cout << "Shared memory unmapped." << std::endl;

   // 7. 关闭共享内存文件描述符
   close(shm_fd);
   std::cout << "Shared memory fd closed." << std::endl;

   // 8. 关闭信号量
   if (sem_close(sem) == -1) {
       perror("sem_close");
  }
   std::cout << "Semaphore closed." << std::endl;

   // 9. 解链共享内存和信号量
   if (shm_unlink(SHM_NAME) == -1) {
       perror("shm_unlink");
  } else {
       std::cout << "Shared memory unlinked: " << SHM_NAME << std::endl;
  }

   if (sem_unlink(SEM_NAME) == -1) {
       perror("sem_unlink");
  } else {
       std::cout << "Semaphore unlinked: " << SEM_NAME << std::endl;
  }

   return 0;
}

编译和运行:(需要链接实时库lrt)

g++ shm_writer.cpp -o writer -lrt
g++ shm_reader.cpp -o reader -lrt

# 独立启动
# 终端1: 启动reader,它会阻塞直到writer写入
./reader

# 终端2: 启动writer
./writer

运行结果:

终端1(reader):

Shared memory object opened: /my_shared_memory
Shared memory mapped at address: 0x7fa2c40c3000
Semaphore opened: /my_sem
Waiting for semaphore (data ready)...
Semaphore acquired. Reading data...
Read from shared memory: 'Hello from shared memory writer!'
Shared memory unmapped.
Shared memory fd closed.
Semaphore closed.
Shared memory unlinked: /my_shared_memory
Semaphore unlinked: /my_sem

终端2(writer):

Shared memory object opened/created: /my_shared_memory
Shared memory size set to 1024 bytes.
Shared memory mapped at address: 0x7f09f9872000
Semaphore opened/created: /my_sem
Wrote message to shared memory: 'Hello from shared memory writer!'
Semaphore posted. Reader can now access data.
Shared memory unmapped.
Shared memory fd closed.
Semaphore closed (not unlinked).

代码解析:

shm_open():打开或创建共享内存对象。它返回一个文件描述符。

ftruncate():设置共享内存对象大小。这是必要的,因为shm_open只创建了元数据,没有分配实际空间。

mmap():将共享内存文件描述符映射到进程的虚拟地址空间。MAP_SHARED表示多个进程可以共享这个映射。

munmap():接触内存映射。

shm_unlink():从系统中删除共享内存对象。此时,即使有进程还在映射,这个对象也会被标记为删除,当所有映射都解除后,物理内存才会被释放。

同步机制(命名信号量)

sem_open():创建或打开一个命名信号量。

sem_wait():P操作,信号量减1。如果信号量值为0,调用进程会阻塞。

sem_post():V操作,信号量加1.

sem_close():关闭信号量。

sem_unlink():从系统中删除命名信号量。

在这个例子中,信号量控制了读写顺序,写入着sem_post后,读取者才能sem_wait并读取。初始值为0,确保读取者先阻塞。

3.4 信号量:进程同步利器

信号量不仅仅是共享内存的辅助,它们本身也是一种重要的IPC机制,主要用于进程间的同步和互斥。

信号量是一个非负整数,由wait(P操作,减1)和post(V操作,加1)两个原子操作来控制。

二值信号量:值只有0和1,可作为互斥锁使用。

计数信号量:值可以大于1,用于管理有限的资源。

上面的共享内存的例子中已经展示了命名信号量的用法。

优点:灵活的同步机制,可以控制资源访问,支持生产者-消费者模型

缺点:容易死锁,需要细心管理。

3.5信号:简单的异步通知

信号是最低级的IPC机制,用于向进程发送异步通知。它不是用于传输大量数据,而是用于通知事件的发生,比如SIGINT(Ctrl+C),SIGTERM(终止进程),SIGCHLD(子进程状态改变)。

优点:简单、异步、操作系统级别支持

缺点:只能传递少量信息(通常只有信号编号),处理复杂(需要注册信号处理函数),不适合常规数据通信。

案例:信号(kill(),signal())

场景:一个进程发送信号通知另一个进程某个事件发生。

signal_sender.cpp

#include <iostream>
#include <signal.h> // For kill()
#include <unistd.h> // For sleep()

int main(int argc, char* argv[]) {
   if (argc != 2) {
       std::cerr << "Usage: " << argv[0] << " <pid_of_receiver>" << std::endl;
       return 1;
  }

   pid_t receiver_pid = std::stoi(argv[1]); // Convert string to PID

   std::cout << "Sender: Sending SIGUSR1 to PID " << receiver_pid << " in 2 seconds..." << std::endl;
   sleep(2); // Give receiver time to set up

   // 发送用户自定义信号1 (SIGUSR1)
   if (kill(receiver_pid, SIGUSR1) == -1) {
       perror("kill");
       return 1;
  }
   std::cout << "Sender: SIGUSR1 sent successfully." << std::endl;

   std::cout << "Sender: Sending SIGTERM to PID " << receiver_pid << " in 3 seconds to terminate..." << std::endl;
   sleep(3);

   // 发送终止信号 (SIGTERM)
   if (kill(receiver_pid, SIGTERM) == -1) {
       perror("kill");
       return 1;
  }
   std::cout << "Sender: SIGTERM sent successfully. Receiver should terminate." << std::endl;

   return 0;
}

signal_receiver.cpp

#include <iostream>
#include <signal.h> // For signal(), sigaction()
#include <unistd.h> // For getpid(), pause()
#include <atomic> // For atomic<bool>

// 使用原子变量来安全地通知主循环退出
std::atomic<bool> keep_running(true);

// 信号处理函数
void signal_handler(int signum) {
   if (signum == SIGUSR1) {
       std::cout << "\nReceiver: Received SIGUSR1 (User-defined signal 1)!" << std::endl;
       // 可以在这里执行一些轻量级操作
  } else if (signum == SIGTERM) {
       std::cout << "\nReceiver: Received SIGTERM (Termination signal)! Preparing to exit." << std::endl;
       keep_running = false; // 通知主循环退出
  } else {
       std::cout << "\nReceiver: Received unexpected signal: " << signum << std::endl;
  }
}

int main() {
   std::cout << "Receiver PID: " << getpid() << std::endl;

   // 注册信号处理函数
   // 使用sigaction()更强大和推荐,因为它允许更多控制(如sa_flags)
   struct sigaction sa;
   sa.sa_handler = signal_handler; // 设置信号处理函数
   sigemptyset(&sa.sa_mask);       // 在信号处理函数执行期间不阻塞其他信号
   sa.sa_flags = 0;                // 0表示默认行为

   if (sigaction(SIGUSR1, &sa, NULL) == -1) {
       perror("sigaction SIGUSR1");
       return 1;
  }
   if (sigaction(SIGTERM, &sa, NULL) == -1) {
       perror("sigaction SIGTERM");
       return 1;
  }

   std::cout << "Receiver: Waiting for signals... (Looping)" << std::endl;

   // 主循环,持续运行直到被信号处理函数改变keep_running
   while (keep_running) {
       // 在这里可以执行一些其他操作,或者仅仅是等待
       std::cout << "Receiver: Working... (PID: " << getpid() << ")" << std::endl;
       sleep(1); // 模拟工作
  }

   std::cout << "Receiver: Exiting gracefully." << std::endl;

   return 0;
}

编译和运行

g++ signal_sender.cpp -o sender
g++ signal_receiver.cpp -o receiver

# 终端1: 启动receiver
./receiver

# 终端2: 查看receiver的PID,然后用sender发送信号
# ps aux | grep receiver # 找到PID
# ./sender <receiver_pid>

运行结果:

终端1(receiver):

Receiver PID: 12345 (实际PID)
Receiver: Waiting for signals... (Looping)
Receiver: Working... (PID: 12345)
Receiver: Working... (PID: 12345)
Receiver: Received SIGUSR1 (User-defined signal 1)!
Receiver: Working... (PID: 12345)
Receiver: Working... (PID: 12345)
Receiver: Received SIGTERM (Termination signal)! Preparing to exit.
Receiver: Exiting gracefully.

终端2(sender):

Sender: Sending SIGUSR1 to PID 12345 in 2 seconds...
Sender: SIGUSR1 sent successfully.
Sender: Sending SIGTERM to PID 12345 in 3 seconds to terminate...
Sender: SIGTERM sent successfully. Receiver should terminate.

代码解析:

getpid():获取当前进程的PID

kill(pid,signum):向指定PID的进程发送指定信号。

sigaction(signum,&sa,NULL):注册信号处理函数。sa.sa_handler是我们定义的函数,它将在接收到信号时被调用。

信号处理函数注意事项:信号处理函数应该是“异步信号安全”的。这意味着在函数内部,只应调用那些在信号处理程序中安全的函数(通常是可重入的,如write,_exit,sleep等)。不应进行复杂的内存分配、IO操作或非原子操作,因为它们可能不是原子的,或者在信号处理函数的执行期间中断了主程序的执行,导致了不可预测的行为。这里使用std::atomic<bool>是安全的。

pause():使进程挂起,直到接收到信号。在我们的例子中,主循环用sleep(1)和keep_running来演示。

3.6 套接字(Sockets):网络通信的基石

套接字使IPC中最通用、最强大的机制,因为它不仅适用于同一台机器上的进程,也适用于网络中不同及其上的进程。

  • Unix域套接字:仅用于同一台机器上的进程通信,但比TCP/IP套接字效率更高,因为它们绕过了网络协议栈。
  • TCP/IP套接字:跨机器通信,网络通信的基石。

优点:极其通用,支持网络透明,面向连接和无连接通信。

缺点:相对复杂。涉及网络协议栈,性能不如共享内存快。

案例:Unix域套接字(AF_UNIX)

UDS在文件系统中由对应的路径,像文件一样。

场景:一个进程作为服务器监听Unix域套接字,另一个进程作为客户端连接并发送消息。

uds_server.cpp

#include <iostream>
#include <sys/socket.h> // For socket(), bind(), listen(), accept()
#include <sys/un.h>     // For sockaddr_un
#include <unistd.h>     // For close(), unlink()
#include <cstring>      // For memset(), strlen()

const char* SOCKET_PATH = "/tmp/my_uds_socket";
const int BUFFER_SIZE = 256;

int main() {
   int server_fd, client_fd;
   struct sockaddr_un server_addr, client_addr;
   socklen_t client_len;
   char buffer[BUFFER_SIZE];

   // 1. 创建套接字
   server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
   if (server_fd == -1) {
       perror("socket");
       return 1;
  }
   std::cout << "Server: Socket created." << std::endl;

   // 2. 配置服务器地址
   memset(&server_addr, 0, sizeof(server_addr));
   server_addr.sun_family = AF_UNIX;
   strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);

   // 3. 移除旧的套接字文件(如果存在)
   unlink(SOCKET_PATH); // Crucial for UDS servers

   // 4. 绑定套接字到文件路径
   if (bind(server_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
       perror("bind");
       close(server_fd);
       return 1;
  }
   std::cout << "Server: Bound to " << SOCKET_PATH << std::endl;

   // 5. 监听连接
   if (listen(server_fd, 5) == -1) { // Max 5 pending connections
       perror("listen");
       close(server_fd);
       unlink(SOCKET_PATH);
       return 1;
  }
   std::cout << "Server: Listening for connections..." << std::endl;

   // 6. 接受客户端连接
   client_len = sizeof(client_addr);
   client_fd = accept(server_fd, (struct sockaddr*)&client_addr, &client_len);
   if (client_fd == -1) {
       perror("accept");
       close(server_fd);
       unlink(SOCKET_PATH);
       return 1;
  }
   std::cout << "Server: Accepted client connection." << std::endl;

   // 7. 从客户端接收数据
   ssize_t bytes_read = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);
   if (bytes_read == -1) {
       perror("recv");
  } else if (bytes_read == 0) {
       std::cout << "Server: Client disconnected." << std::endl;
  } else {
       buffer[bytes_read] = '\0'; // Null-terminate
       std::cout << "Server: Received: '" << buffer << "'" << std::endl;

       // 8. 向客户端发送响应
       const char* response_msg = "Message received, server says hello back!";
       if (send(client_fd, response_msg, strlen(response_msg) + 1, 0) == -1) {
           perror("send");
      } else {
           std::cout << "Server: Sent response: '" << response_msg << "'" << std::endl;
      }
  }

   // 9. 关闭套接字
   close(client_fd);
   std::cout << "Server: Client socket closed." << std::endl;
   close(server_fd);
   std::cout << "Server: Server socket closed." << std::endl;

   // 10. 删除套接字文件
   if (unlink(SOCKET_PATH) == -1) {
       perror("unlink socket file");
  } else {
       std::cout << "Server: Socket file " << SOCKET_PATH << " removed." << std::endl;
  }

   return 0;
}

uds_client.cpp

#include <iostream>
#include <sys/socket.h> // For socket(), connect()
#include <sys/un.h>     // For sockaddr_un
#include <unistd.h>     // For close()
#include <cstring>      // For memset(), strlen()

const char* SOCKET_PATH = "/tmp/my_uds_socket";
const int BUFFER_SIZE = 256;

int main() {
   int client_fd;
   struct sockaddr_un server_addr;
   char buffer[BUFFER_SIZE];

   // 1. 创建套接字
   client_fd = socket(AF_UNIX, SOCK_STREAM, 0);
   if (client_fd == -1) {
       perror("socket");
       return 1;
  }
   std::cout << "Client: Socket created." << std::endl;

   // 2. 配置服务器地址
   memset(&server_addr, 0, sizeof(server_addr));
   server_addr.sun_family = AF_UNIX;
   strncpy(server_addr.sun_path, SOCKET_PATH, sizeof(server_addr.sun_path) - 1);

   // 3. 连接到服务器
   std::cout << "Client: Attempting to connect to " << SOCKET_PATH << std::endl;
   if (connect(client_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
       perror("connect");
       close(client_fd);
       return 1;
  }
   std::cout << "Client: Connected to server." << std::endl;

   // 4. 向服务器发送数据
   const char* message = "Hello from Unix Domain Socket client!";
   if (send(client_fd, message, strlen(message) + 1, 0) == -1) {
       perror("send");
  } else {
       std::cout << "Client: Sent: '" << message << "'" << std::endl;
  }

   // 5. 从服务器接收响应
   ssize_t bytes_read = recv(client_fd, buffer, BUFFER_SIZE - 1, 0);
   if (bytes_read == -1) {
       perror("recv");
  } else if (bytes_read == 0) {
       std::cout << "Client: Server disconnected." << std::endl;
  } else {
       buffer[bytes_read] = '\0'; // Null-terminate
       std::cout << "Client: Received response: '" << buffer << "'" << std::endl;
  }

   // 6. 关闭套接字
   close(client_fd);
   std::cout << "Client: Socket closed." << std::endl;

   return 0;
}

编译运行:

g++ uds_server.cpp -o server
g++ uds_client.cpp -o client

# 独立启动
# 终端1: 启动server
./server

# 终端2: 启动client
./client

运行结果:

终端1(server):

Server: Socket created.
Server: Bound to /tmp/my_uds_socket
Server: Listening for connections...
Server: Accepted client connection.
Server: Received: 'Hello from Unix Domain Socket client!'
Server: Sent response: 'Message received, server says hello back!'
Server: Client socket closed.
Server: Server socket closed.
Server: Socket file /tmp/my_uds_socket removed.

终端2(client):

Client: Socket created.
Client: Attempting to connect to /tmp/my_uds_socket
Client: Connected to server.
Client: Sent: 'Hello from Unix Domain Socket client!'
Client: Received response: 'Message received, server says hello back!'
Client: Socket closed.

代码解析:

socket(AF_UNIX,SOCK_STREAM,0):创建Unix域套接字。AF_UNIX指定地址族为Unix域,SOCK_STREAM指定套接字类型为流式(TCP)

struct sockaddr_un:Unix域套接字地址结构,其中sun_path字段存放套接字文件路径。

unlink(SOCKET_PATH):在绑定前,通常需要删除旧的套接字文件,以防止上次程序异常终止未清除。

bind():将套接字文件路径与套接字描述符关联起来。

listen():使服务器套接字进入监听状态。

accept():阻塞式等待客户端连接并接收。成功后返回一个新的套接字描述符用于客户端通信。

connect():客户端连接到指定路劲的服务器套接字。

send()/recv():通过已连接的套接字发送和接收数据。

close():关闭套接字描述符。

unlink(SOCKET_PATH):服务器在退出时,负责删除创建的套接字文件。

4.IPC选择指南

选择合适的IPC机制取决于你的具体需求:

亲缘关系进程间少量数据通信且简单:匿名管道

任意进程间少量数据通信但需要文件路径:命名管道。

结构化消息传递,消息有优先级,需要异步通信:消息队列。

大数据量、高效率通信,且能处理同步问题:共享内存(配合信号量或互斥锁)

进程间简单事件通知或控制:信号。

通用通信,跨机器通信,或复杂的客户端-服务的模式:套接字(Unix域套接字更高效,TCP/IP套接字最通用)。

5.总结

IPC是C++系统编程中一个强大而复杂的领域。掌握这些机制将大大提升你的设计和实现高并发、高可用、模块化系统的能力。从简单的管道到高效的共享内存,再到灵活的套接字,每种机制都有独特的应用场景和优缺点。理解它们的基本原理,并结合实际案例进行练习,你将能够游刃有余地在C++中构建出健壮的多进程应用程序。

希望这篇博客对你的C++ IPC学习之旅有所帮助!动手尝试这些代码,才能真正掌握这些知识。

注解

1.上下文切换开销大是什么意思?

上下文切换开销大意味着操作系统在多进程间切换时,用于保存当前进程状态、加载下一个进程状态、更新内存管理信息(特别是TLB和页表)、处理CPU缓存未命中等操作所消耗的CPU时间和资源较多。这种开销会降低系统的整体吞吐量,尤其是在进程切换非常频繁的场景下。因此,在设计并发程序时,如果任务间需要高度共享数据且对切换延迟敏感,可能会优先考虑多线程;如果需要强隔离性、独立性或利用多核并行处理不相关的任务,则多进程时合适的选择,但需要意识到其切换开销。

2.啥是原子操作?

原子操作指的是在多线程或多进程环境中,一个或一系列操作妖魔完全执行,要么完全不执行,并且在执行过程中不会被其他线程/进程打断。它就像一个不可分割的最小单元,从其他线程/进程的角度看,这个操作是瞬间完成的。

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇