SST  15.1.0
StructuralSimulationToolkit
ipctunnel.h
1 // Copyright 2009-2025 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2025, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_INTERPROCESS_IPCTUNNEL_H
13 #define SST_CORE_INTERPROCESS_IPCTUNNEL_H
14 
15 #include "sst/core/interprocess/circularBuffer.h"
16 
17 #include <cerrno>
18 #include <cstddef>
19 #include <cstdint>
20 #include <cstdio>
21 #include <cstring>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <string>
25 #include <sys/mman.h>
26 #include <sys/stat.h>
27 #include <unistd.h>
28 #include <utility>
29 #include <vector>
30 
31 namespace SST::Core::Interprocess {
32 
33 extern uint32_t globalIPCTunnelCount;
34 /**
35  * Tunneling class between two processes, connected by shared memory.
36  * Supports multiple circular-buffer queues, and a generic region
37  * of memory for shared data.
38  *
39  * @tparam ShareDataType Type to put in the shared data region
40  * @tparam MsgType Type of messages being sent in the circular buffers
41  */
42 template <typename ShareDataType, typename MsgType>
43 class IPCTunnel
44 {
45 
47 
48  struct InternalSharedData
49  {
50  volatile uint32_t expectedChildren;
51  size_t shmSegSize;
52  size_t numBuffers;
53  size_t offsets[0]; // Actual size: numBuffers + 2
54  };
55 
56 public:
57  /**
58  * Construct a new Tunnel for IPC Communications
59  * @param comp_id Component ID of owner
60  * @param numBuffers Number of buffers for which we should tunnel
61  * @param bufferSize How large each core's buffer should be
62  */
63  IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren = 1) :
64  master(true),
65  shmPtr(nullptr),
66  fd(-1)
67  {
68  char key[256];
69  memset(key, '\0', sizeof(key));
70  do {
71  snprintf(key, sizeof(key), "/sst_shmem_%u-%" PRIu32 "-%d", getpid(), comp_id, rand());
72  filename = key;
73 
74  fd = shm_open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
75  /* There's a rare chance that a file we are looking to use exists.
76  * It's unlikely, but perhaps a previous run (with the same PID
77  * and random number) crashed before the * clients all connected.
78  *
79  * So, if we get an error, and the error is EEXIST, try again with
80  * a different random number.
81  */
82  } while ( (fd < 0) && (errno == EEXIST) );
83  if ( fd < 0 ) {
84  // Not using Output because IPC means Output might not be available
85  fprintf(stderr, "Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
86  exit(1);
87  }
88 
89  shmSize = calculateShmemSize(numBuffers, bufferSize);
90  if ( ftruncate(fd, shmSize) ) {
91  // Not using Output because IPC means Output might not be available
92  fprintf(stderr, "Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
93  exit(1);
94  }
95 
96  shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
97  if ( shmPtr == MAP_FAILED ) {
98  // Not using Output because IPC means Output might not be available
99  fprintf(stderr, "mmap failed: %s\n", strerror(errno));
100  exit(1);
101  }
102  nextAllocPtr = (uint8_t*)shmPtr;
103  memset(shmPtr, '\0', shmSize);
104 
105  /* Construct our private buffer first. Used for our communications */
106  auto resResult = reserveSpace<InternalSharedData>((1 + numBuffers) * sizeof(size_t));
107  isd = resResult.second;
108  isd->expectedChildren = expectedChildren;
109  isd->shmSegSize = shmSize;
110  isd->numBuffers = numBuffers;
111 
112  /* Construct user's shared-data region */
113  auto shareResult = reserveSpace<ShareDataType>(0);
114  isd->offsets[0] = shareResult.first;
115  sharedData = shareResult.second;
116 
117  /* Construct the circular buffers */
118  const size_t cbSize = sizeof(MsgType) * bufferSize;
119  for ( size_t c = 0; c < isd->numBuffers; c++ ) {
120  CircBuff_t* cPtr = nullptr;
121 
122  auto resResult = reserveSpace<CircBuff_t>(cbSize);
123  isd->offsets[1 + c] = resResult.first;
124  cPtr = resResult.second;
125  if ( !cPtr->setBufferSize(bufferSize) ) exit(1);
126  circBuffs.push_back(cPtr);
127  }
128  }
129 
130  /**
131  * Access an existing Tunnel
132  * @param region_name Name of the shared-memory region to access
133  */
134  explicit IPCTunnel(const std::string& region_name) :
135  master(false),
136  shmPtr(nullptr),
137  fd(-1)
138  {
139  fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
140  filename = region_name;
141 
142  if ( fd < 0 ) {
143  // Not using Output because IPC means Output might not be available
144  fprintf(stderr, "Failed to open IPC region '%s': %s\n", filename.c_str(), strerror(errno));
145  exit(1);
146  }
147 
148  shmPtr = mmap(nullptr, sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
149  if ( shmPtr == MAP_FAILED ) {
150  // Not using Output because IPC means Output might not be available
151  fprintf(stderr, "mmap 0 failed: %s\n", strerror(errno));
152  exit(1);
153  }
154 
155  isd = (InternalSharedData*)shmPtr;
156  shmSize = isd->shmSegSize;
157  munmap(shmPtr, sizeof(InternalSharedData));
158 
159  shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
160  if ( shmPtr == MAP_FAILED ) {
161  // Not using Output because IPC means Output might not be available
162  fprintf(stderr, "mmap 1 failed: %s\n", strerror(errno));
163  exit(1);
164  }
165  isd = (InternalSharedData*)shmPtr;
166  sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
167 
168  for ( size_t c = 0; c < isd->numBuffers; c++ ) {
169  circBuffs.push_back((CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c + 1]));
170  }
171 
172  /* Clean up if we're the last to attach */
173  if ( --isd->expectedChildren == 0 ) {
174  shm_unlink(filename.c_str());
175  }
176  }
177 
178  /**
179  * Destructor
180  */
181  virtual ~IPCTunnel() { shutdown(true); }
182 
183  /**
184  * Shutdown
185  */
186  void shutdown(bool all = false)
187  {
188  if ( master ) {
189  for ( CircBuff_t* cb : circBuffs ) {
190  cb->~CircBuff_t();
191  }
192  }
193  if ( shmPtr ) {
194  munmap(shmPtr, shmSize);
195  shmPtr = nullptr;
196  shmSize = 0;
197  }
198  if ( fd >= 0 ) {
199  close(fd);
200  fd = -1;
201  }
202  }
203 
204  const std::string& getRegionName() const { return filename; }
205 
206  /** return a pointer to the ShareDataType region */
207  ShareDataType* getSharedData() { return sharedData; }
208 
209  /** Blocks until space is available **/
210  void writeMessage(size_t core, const MsgType& command) { circBuffs[core]->write(command); }
211 
212  /** Blocks until a command is available **/
213  MsgType readMessage(size_t buffer) { return circBuffs[buffer]->read(); }
214 
215  /** Non-blocking version of readMessage **/
216  bool readMessageNB(size_t buffer, MsgType* result) { return circBuffs[buffer]->readNB(result); }
217 
218  /** Empty the messages in the buffer **/
219  void clearBuffer(size_t core) { circBuffs[core]->clearBuffer(); }
220 
221 private:
222  template <typename T>
223  std::pair<size_t, T*> reserveSpace(size_t extraSpace = 0)
224  {
225  size_t space = sizeof(T) + extraSpace;
226  if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize ) return std::make_pair<size_t, T*>(0, nullptr);
227  T* ptr = (T*)nextAllocPtr;
228  nextAllocPtr += space;
229  new (ptr) T(); // Call constructor if need be
230  return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
231  }
232 
233  size_t static calculateShmemSize(size_t numBuffers, size_t bufferSize)
234  {
235  long page_size = sysconf(_SC_PAGESIZE);
236 
237  /* Count how many pages are needed, at minimum */
238  size_t isd = 1 + ((sizeof(InternalSharedData) + (1 + numBuffers) * sizeof(size_t)) / page_size);
239  size_t buffer = 1 + ((sizeof(CircBuff_t) + bufferSize * sizeof(MsgType)) / page_size);
240  size_t shdata = 1 + ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
241 
242  /* Alloc 2 extra pages, just in case */
243  return (2 + isd + shdata + numBuffers * buffer) * page_size;
244  }
245 
246 protected:
247  /** Pointer to the Shared Data Region */
248  ShareDataType* sharedData;
249 
250 private:
251  bool master;
252  void* shmPtr;
253  int fd;
254 
255  std::string filename;
256  uint8_t* nextAllocPtr;
257  size_t shmSize;
258  InternalSharedData* isd;
259  std::vector<CircBuff_t*> circBuffs;
260 };
261 
262 } // namespace SST::Core::Interprocess
263 
264 #endif // SST_CORE_INTERPROCESS_IPCTUNNEL_H
void shutdown(bool all=false)
Shutdown.
Definition: ipctunnel.h:186
Definition: circularBuffer.h:20
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition: ipctunnel.h:63
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition: ipctunnel.h:213
Definition: circularBuffer.h:23
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition: ipctunnel.h:210
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition: ipctunnel.h:248
virtual ~IPCTunnel()
Destructor.
Definition: ipctunnel.h:181
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition: ipctunnel.h:216
Tunneling class between two processes, connected by shared memory.
Definition: ipctunnel.h:43
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition: ipctunnel.h:207
IPCTunnel(const std::string &region_name)
Access an existing Tunnel.
Definition: ipctunnel.h:134
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition: ipctunnel.h:219