SST  12.0.0
StructuralSimulationToolkit
ipctunnel.h
1 // Copyright 2009-2022 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2022, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_INTERPROCESS_IPCTUNNEL_H
13 #define SST_CORE_INTERPROCESS_IPCTUNNEL_H
14 
15 #include "sst/core/interprocess/circularBuffer.h"
16 
17 #include <cstdio>
18 #include <cstring>
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <string>
22 #include <sys/mman.h>
23 #include <sys/stat.h>
24 #include <unistd.h>
25 #include <vector>
26 
27 namespace SST {
28 namespace Core {
29 namespace Interprocess {
30 
31 extern uint32_t globalIPCTunnelCount;
32 /**
33  * Tunneling class between two processes, connected by shared memory.
34  * Supports multiple circular-buffer queues, and a generic region
35  * of memory for shared data.
36  *
37  * @tparam ShareDataType Type to put in the shared data region
38  * @tparam MsgType Type of messages being sent in the circular buffers
39  */
40 template <typename ShareDataType, typename MsgType>
41 class IPCTunnel
42 {
43 
45 
46  struct InternalSharedData
47  {
48  volatile uint32_t expectedChildren;
49  size_t shmSegSize;
50  size_t numBuffers;
51  size_t offsets[0]; // Actual size: numBuffers + 2
52  };
53 
54 public:
55  /**
56  * Construct a new Tunnel for IPC Communications
57  * @param comp_id Component ID of owner
58  * @param numBuffers Number of buffers for which we should tunnel
59  * @param bufferSize How large each core's buffer should be
60  */
61  IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren = 1) :
62  master(true),
63  shmPtr(nullptr),
64  fd(-1)
65  {
66  char key[256];
67  memset(key, '\0', sizeof(key));
68  do {
69  snprintf(key, sizeof(key), "/sst_shmem_%u-%" PRIu32 "-%d", getpid(), comp_id, rand());
70  filename = key;
71 
72  fd = shm_open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
73  /* There's a rare chance that a file we are looking to use exists.
74  * It's unlikely, but perhaps a previous run (with the same PID
75  * and random number) crashed before the * clients all connected.
76  *
77  * So, if we get an error, and the error is EEXIST, try again with
78  * a different random number.
79  */
80  } while ( (fd < 0) && (errno == EEXIST) );
81  if ( fd < 0 ) {
82  // Not using Output because IPC means Output might not be available
83  fprintf(stderr, "Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
84  exit(1);
85  }
86 
87  shmSize = calculateShmemSize(numBuffers, bufferSize);
88  if ( ftruncate(fd, shmSize) ) {
89  // Not using Output because IPC means Output might not be available
90  fprintf(stderr, "Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
91  exit(1);
92  }
93 
94  shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
95  if ( shmPtr == MAP_FAILED ) {
96  // Not using Output because IPC means Output might not be available
97  fprintf(stderr, "mmap failed: %s\n", strerror(errno));
98  exit(1);
99  }
100  nextAllocPtr = (uint8_t*)shmPtr;
101  memset(shmPtr, '\0', shmSize);
102 
103  /* Construct our private buffer first. Used for our communications */
104  auto resResult = reserveSpace<InternalSharedData>((1 + numBuffers) * sizeof(size_t));
105  isd = resResult.second;
106  isd->expectedChildren = expectedChildren;
107  isd->shmSegSize = shmSize;
108  isd->numBuffers = numBuffers;
109 
110  /* Construct user's shared-data region */
111  auto shareResult = reserveSpace<ShareDataType>(0);
112  isd->offsets[0] = shareResult.first;
113  sharedData = shareResult.second;
114 
115  /* Construct the circular buffers */
116  const size_t cbSize = sizeof(MsgType) * bufferSize;
117  for ( size_t c = 0; c < isd->numBuffers; c++ ) {
118  CircBuff_t* cPtr = nullptr;
119 
120  auto resResult = reserveSpace<CircBuff_t>(cbSize);
121  isd->offsets[1 + c] = resResult.first;
122  cPtr = resResult.second;
123  if ( !cPtr->setBufferSize(bufferSize) ) exit(1);
124  circBuffs.push_back(cPtr);
125  }
126  }
127 
128  /**
129  * Access an existing Tunnel
130  * @param region_name Name of the shared-memory region to access
131  */
132  IPCTunnel(const std::string& region_name) : master(false), shmPtr(nullptr), fd(-1)
133  {
134  fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
135  filename = region_name;
136 
137  if ( fd < 0 ) {
138  // Not using Output because IPC means Output might not be available
139  fprintf(stderr, "Failed to open IPC region '%s': %s\n", filename.c_str(), strerror(errno));
140  exit(1);
141  }
142 
143  shmPtr = mmap(nullptr, sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
144  if ( shmPtr == MAP_FAILED ) {
145  // Not using Output because IPC means Output might not be available
146  fprintf(stderr, "mmap 0 failed: %s\n", strerror(errno));
147  exit(1);
148  }
149 
150  isd = (InternalSharedData*)shmPtr;
151  shmSize = isd->shmSegSize;
152  munmap(shmPtr, sizeof(InternalSharedData));
153 
154  shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
155  if ( shmPtr == MAP_FAILED ) {
156  // Not using Output because IPC means Output might not be available
157  fprintf(stderr, "mmap 1 failed: %s\n", strerror(errno));
158  exit(1);
159  }
160  isd = (InternalSharedData*)shmPtr;
161  sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
162 
163  for ( size_t c = 0; c < isd->numBuffers; c++ ) {
164  circBuffs.push_back((CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c + 1]));
165  }
166 
167  /* Clean up if we're the last to attach */
168  if ( --isd->expectedChildren == 0 ) { shm_unlink(filename.c_str()); }
169  }
170 
171  /**
172  * Destructor
173  */
174  virtual ~IPCTunnel() { shutdown(true); }
175 
176  /**
177  * Shutdown
178  */
179  void shutdown(bool all = false)
180  {
181  if ( master ) {
182  for ( CircBuff_t* cb : circBuffs ) {
183  cb->~CircBuff_t();
184  }
185  }
186  if ( shmPtr ) {
187  munmap(shmPtr, shmSize);
188  shmPtr = nullptr;
189  shmSize = 0;
190  }
191  if ( fd >= 0 ) {
192  close(fd);
193  fd = -1;
194  }
195  }
196 
197  const std::string& getRegionName(void) const { return filename; }
198 
199  /** return a pointer to the ShareDataType region */
200  ShareDataType* getSharedData() { return sharedData; }
201 
202  /** Blocks until space is available **/
203  void writeMessage(size_t core, const MsgType& command) { circBuffs[core]->write(command); }
204 
205  /** Blocks until a command is available **/
206  MsgType readMessage(size_t buffer) { return circBuffs[buffer]->read(); }
207 
208  /** Non-blocking version of readMessage **/
209  bool readMessageNB(size_t buffer, MsgType* result) { return circBuffs[buffer]->readNB(result); }
210 
211  /** Empty the messages in the buffer **/
212  void clearBuffer(size_t core) { circBuffs[core]->clearBuffer(); }
213 
214 private:
215  template <typename T>
216  std::pair<size_t, T*> reserveSpace(size_t extraSpace = 0)
217  {
218  size_t space = sizeof(T) + extraSpace;
219  if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize ) return std::make_pair<size_t, T*>(0, nullptr);
220  T* ptr = (T*)nextAllocPtr;
221  nextAllocPtr += space;
222  new (ptr) T(); // Call constructor if need be
223  return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
224  }
225 
226  size_t static calculateShmemSize(size_t numBuffers, size_t bufferSize)
227  {
228  long page_size = sysconf(_SC_PAGESIZE);
229 
230  /* Count how many pages are needed, at minimum */
231  size_t isd = 1 + ((sizeof(InternalSharedData) + (1 + numBuffers) * sizeof(size_t)) / page_size);
232  size_t buffer = 1 + ((sizeof(CircBuff_t) + bufferSize * sizeof(MsgType)) / page_size);
233  size_t shdata = 1 + ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
234 
235  /* Alloc 2 extra pages, just in case */
236  return (2 + isd + shdata + numBuffers * buffer) * page_size;
237  }
238 
239 protected:
240  /** Pointer to the Shared Data Region */
241  ShareDataType* sharedData;
242 
243 private:
244  bool master;
245  void* shmPtr;
246  int fd;
247 
248  std::string filename;
249  uint8_t* nextAllocPtr;
250  size_t shmSize;
251  InternalSharedData* isd;
252  std::vector<CircBuff_t*> circBuffs;
253 };
254 
255 } // namespace Interprocess
256 } // namespace Core
257 } // namespace SST
258 
259 #endif // SST_CORE_INTERPROCESS_IPCTUNNEL_H
void shutdown(bool all=false)
Shutdown.
Definition: ipctunnel.h:179
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition: ipctunnel.h:61
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition: ipctunnel.h:206
Definition: circularBuffer.h:22
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition: ipctunnel.h:203
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition: ipctunnel.h:241
virtual ~IPCTunnel()
Destructor.
Definition: ipctunnel.h:174
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition: ipctunnel.h:209
Tunneling class between two processes, connected by shared memory.
Definition: ipctunnel.h:41
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition: ipctunnel.h:200
IPCTunnel(const std::string &region_name)
Access an existing Tunnel.
Definition: ipctunnel.h:132
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition: ipctunnel.h:212