SST  11.0.0
StructuralSimulationToolkit
ipctunnel.h
1 // Copyright 2009-2021 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2021, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_INTERPROCESS_TUNNEL_H
13 #define SST_CORE_INTERPROCESS_TUNNEL_H 1
14 
15 
16 #include <fcntl.h>
17 #include <cstdio>
18 #include <vector>
19 #include <string>
20 #include <errno.h>
21 #include <cstring>
22 #include <sys/mman.h>
23 #include <sys/stat.h>
24 #include <unistd.h>
25 
26 #include "sst/core/interprocess/circularBuffer.h"
27 
28 namespace SST {
29 namespace Core {
30 namespace Interprocess {
31 
32 
33 extern uint32_t globalIPCTunnelCount;
34 /**
35  * Tunneling class between two processes, connected by shared memory.
36  * Supports multiple circular-buffer queues, and a generic region
37  * of memory for shared data.
38  *
39  * @tparam ShareDataType Type to put in the shared data region
40  * @tparam MsgType Type of messages being sent in the circular buffers
41  */
42 template<typename ShareDataType, typename MsgType>
43 class IPCTunnel {
44 
46 
47  struct InternalSharedData {
48  volatile uint32_t expectedChildren;
49  size_t shmSegSize;
50  size_t numBuffers;
51  size_t offsets[0]; // Actual size: numBuffers + 2
52  };
53 
54 
55 public:
56  /**
57  * Construct a new Tunnel for IPC Communications
58  * @param comp_id Component ID of owner
59  * @param numBuffers Number of buffers for which we should tunnel
60  * @param bufferSize How large each core's buffer should be
61  */
62  IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren = 1) : master(true), shmPtr(nullptr), fd(-1)
63  {
64  char key[256];
65  memset(key, '\0', sizeof(key));
66  do {
67  snprintf(key, sizeof(key), "/sst_shmem_%u-%" PRIu32 "-%d", getpid(), comp_id, rand());
68  filename = key;
69 
70  fd = shm_open(filename.c_str(), O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
71  /* There's a rare chance that a file we are looking to use exists.
72  * It's unlikely, but perhaps a previous run (with the same PID
73  * and random number) crashed before the * clients all connected.
74  *
75  * So, if we get an error, and the error is EEXIST, try again with
76  * a different random number.
77  */
78  } while ( (fd < 0) && (errno == EEXIST) );
79  if ( fd < 0 ) {
80  // Not using Output because IPC means Output might not be available
81  fprintf(stderr, "Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
82  exit(1);
83  }
84 
85 
86  shmSize = calculateShmemSize(numBuffers, bufferSize);
87  if ( ftruncate(fd, shmSize) ) {
88  // Not using Output because IPC means Output might not be available
89  fprintf(stderr, "Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
90  exit(1);
91  }
92 
93  shmPtr = mmap(nullptr, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
94  if ( shmPtr == MAP_FAILED ) {
95  // Not using Output because IPC means Output might not be available
96  fprintf(stderr, "mmap failed: %s\n", strerror(errno));
97  exit(1);
98  }
99  nextAllocPtr = (uint8_t*)shmPtr;
100  memset(shmPtr, '\0', shmSize);
101 
102  /* Construct our private buffer first. Used for our communications */
103  auto resResult = reserveSpace<InternalSharedData>((1+numBuffers)*sizeof(size_t));
104  isd = resResult.second;
105  isd->expectedChildren = expectedChildren;
106  isd->shmSegSize = shmSize;
107  isd->numBuffers = numBuffers;
108 
109  /* Construct user's shared-data region */
110  auto shareResult = reserveSpace<ShareDataType>(0);
111  isd->offsets[0] = shareResult.first;
112  sharedData = shareResult.second;
113 
114  /* Construct the circular buffers */
115  const size_t cbSize = sizeof(MsgType) * bufferSize;
116  for ( size_t c = 0 ; c < isd->numBuffers ; c++ ) {
117  CircBuff_t* cPtr = nullptr;
118 
119  auto resResult = reserveSpace<CircBuff_t>(cbSize);
120  isd->offsets[1+c] = resResult.first;
121  cPtr = resResult.second;
122  if (!cPtr->setBufferSize(bufferSize))
123  exit(1);
124  circBuffs.push_back(cPtr);
125  }
126 
127  }
128 
129  /**
130  * Access an existing Tunnel
131  * @param region_name Name of the shared-memory region to access
132  */
133  IPCTunnel(const std::string& region_name) : master(false), shmPtr(nullptr), fd(-1)
134  {
135  fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR|S_IWUSR);
136  filename = region_name;
137 
138  if ( fd < 0 ) {
139  // Not using Output because IPC means Output might not be available
140  fprintf(stderr, "Failed to open IPC region '%s': %s\n",
141  filename.c_str(), strerror(errno));
142  exit(1);
143  }
144 
145  shmPtr = mmap(nullptr, sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
146  if ( shmPtr == MAP_FAILED ) {
147  // Not using Output because IPC means Output might not be available
148  fprintf(stderr, "mmap 0 failed: %s\n", strerror(errno));
149  exit(1);
150  }
151 
152  isd = (InternalSharedData*)shmPtr;
153  shmSize = isd->shmSegSize;
154  munmap(shmPtr, sizeof(InternalSharedData));
155 
156  shmPtr = mmap(nullptr, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
157  if ( shmPtr == MAP_FAILED ) {
158  // Not using Output because IPC means Output might not be available
159  fprintf(stderr, "mmap 1 failed: %s\n", strerror(errno));
160  exit(1);
161  }
162  isd = (InternalSharedData*)shmPtr;
163  sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
164 
165  for ( size_t c = 0 ; c < isd->numBuffers ; c++ ) {
166  circBuffs.push_back((CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c+1]));
167  }
168 
169  /* Clean up if we're the last to attach */
170  if ( --isd->expectedChildren == 0 ) {
171  shm_unlink(filename.c_str());
172  }
173  }
174 
175 
176  /**
177  * Destructor
178  */
179  virtual ~IPCTunnel()
180  {
181  shutdown(true);
182  }
183 
184  /**
185  * Shutdown
186  */
187  void shutdown(bool all = false)
188  {
189  if ( master ) {
190  for ( CircBuff_t *cb : circBuffs ) {
191  cb->~CircBuff_t();
192  }
193  }
194  if ( shmPtr ) {
195  munmap(shmPtr, shmSize);
196  shmPtr = nullptr;
197  shmSize = 0;
198  }
199  if ( fd >= 0 ) {
200  close(fd);
201  fd = -1;
202  }
203  }
204 
205  const std::string& getRegionName(void) const { return filename; }
206 
207  /** return a pointer to the ShareDataType region */
208  ShareDataType* getSharedData() { return sharedData; }
209 
210  /** Blocks until space is available **/
211  void writeMessage(size_t core, const MsgType &command) {
212  circBuffs[core]->write(command);
213  }
214 
215  /** Blocks until a command is available **/
216  MsgType readMessage(size_t buffer) {
217  return circBuffs[buffer]->read();
218  }
219 
220  /** Non-blocking version of readMessage **/
221  bool readMessageNB(size_t buffer, MsgType *result) {
222  return circBuffs[buffer]->readNB(result);
223  }
224 
225  /** Empty the messages in the buffer **/
226  void clearBuffer(size_t core) {
227  circBuffs[core]->clearBuffer();
228  }
229 
230 
231 private:
232  template <typename T>
233  std::pair<size_t, T*> reserveSpace(size_t extraSpace = 0)
234  {
235  size_t space = sizeof(T) + extraSpace;
236  if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize )
237  return std::make_pair<size_t, T*>(0, nullptr);
238  T* ptr = (T*)nextAllocPtr;
239  nextAllocPtr += space;
240  new (ptr) T(); // Call constructor if need be
241  return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
242  }
243 
244  size_t static calculateShmemSize(size_t numBuffers, size_t bufferSize)
245  {
246  long page_size = sysconf(_SC_PAGESIZE);
247 
248  /* Count how many pages are needed, at minimum */
249  size_t isd = 1 + ((sizeof(InternalSharedData) + (1+numBuffers)*sizeof(size_t)) / page_size);
250  size_t buffer = 1+ ((sizeof(CircBuff_t) +
251  bufferSize*sizeof(MsgType)) / page_size);
252  size_t shdata = 1+ ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
253 
254  /* Alloc 2 extra pages, just in case */
255  return (2 + isd + shdata + numBuffers*buffer) * page_size;
256  }
257 
258 protected:
259  /** Pointer to the Shared Data Region */
260  ShareDataType *sharedData;
261 
262 private:
263  bool master;
264  void *shmPtr;
265  int fd;
266 
267  std::string filename;
268  uint8_t *nextAllocPtr;
269  size_t shmSize;
270  InternalSharedData *isd;
271  std::vector<CircBuff_t* > circBuffs;
272 
273 };
274 
275 }
276 }
277 }
278 
279 
280 
281 #endif
void shutdown(bool all=false)
Shutdown.
Definition: ipctunnel.h:187
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition: ipctunnel.h:62
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition: ipctunnel.h:216
Definition: circularBuffer.h:22
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition: ipctunnel.h:211
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition: ipctunnel.h:260
virtual ~IPCTunnel()
Destructor.
Definition: ipctunnel.h:179
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition: ipctunnel.h:221
Tunneling class between two processes, connected by shared memory.
Definition: ipctunnel.h:43
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition: ipctunnel.h:208
IPCTunnel(const std::string &region_name)
Access an existing Tunnel.
Definition: ipctunnel.h:133
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition: ipctunnel.h:226