SST  7.1.0
StructuralSimulationToolkit
ipctunnel.h
1 // Copyright 2009-2017 Sandia Corporation. Under the terms
2 // of Contract DE-NA0003525 with Sandia Corporation, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2017, Sandia Corporation
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_INTERPROCESS_TUNNEL_H
13 #define SST_CORE_INTERPROCESS_TUNNEL_H 1
14 
15 
16 #include <fcntl.h>
17 #include <cstdio>
18 #include <vector>
19 #include <string>
20 #include <errno.h>
21 #include <cstring>
22 #include <sys/mman.h>
23 #include <sys/stat.h>
24 #include <unistd.h>
25 
26 #include <sst/core/interprocess/circularBuffer.h>
27 
28 namespace SST {
29 namespace Core {
30 namespace Interprocess {
31 
32 
33 extern uint32_t globalIPCTunnelCount;
34 /**
35  * Tunneling class between two processes, connected by shared memory.
36  * Supports multiple circular-buffer queues, and a generic region
37  * of memory for shared data.
38  *
39  * @tparam ShareDataType Type to put in the shared data region
40  * @tparam MsgType Type of messages being sent in the circular buffers
41  */
42 template<typename ShareDataType, typename MsgType>
43 class IPCTunnel {
44 
46 
47  struct InternalSharedData {
48  volatile uint32_t expectedChildren;
49  size_t shmSegSize;
50  size_t numBuffers;
51  size_t offsets[0]; // Actual size: numBuffers + 2
52  };
53 
54 
55 public:
56  /**
57  * Construct a new Tunnel for IPC Communications
58  * @param comp_id Component ID of owner
59  * @param numBuffers Number of buffers for which we should tunnel
60  * @param bufferSize How large each core's buffer should be
61  */
62  IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren = 1) : master(true), shmPtr(NULL), fd(-1)
63  {
64  char key[256];
65  memset(key, '\0', sizeof(key));
66  do {
67  snprintf(key, sizeof(key), "/sst_shmem_%u-%" PRIu32 "-%d", getpid(), comp_id, rand());
68  filename = key;
69 
70  fd = shm_open(filename.c_str(), O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
71  /* There's a rare chance that a file we are looking to use exists.
72  * It's unlikely, but perhaps a previous run (with the same PID
73  * and random number) crashed before the * clients all connected.
74  *
75  * So, if we get an error, and the error is EEXIST, try again with
76  * a different random number.
77  */
78  } while ( (fd < 0) && (errno == EEXIST) );
79  if ( fd < 0 ) {
80  // Not using Output because IPC means Output might not be available
81  fprintf(stderr, "Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
82  exit(1);
83  }
84 
85 
86  shmSize = calculateShmemSize(numBuffers, bufferSize);
87  if ( ftruncate(fd, shmSize) ) {
88  // Not using Output because IPC means Output might not be available
89  fprintf(stderr, "Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
90  exit(1);
91  }
92 
93  shmPtr = mmap(NULL, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
94  if ( shmPtr == MAP_FAILED ) {
95  // Not using Output because IPC means Output might not be available
96  fprintf(stderr, "mmap failed: %s\n", strerror(errno));
97  exit(1);
98  }
99  nextAllocPtr = (uint8_t*)shmPtr;
100  memset(shmPtr, '\0', shmSize);
101 
102  /* Construct our private buffer first. Used for our communications */
103  auto resResult = reserveSpace<InternalSharedData>((1+numBuffers)*sizeof(size_t));
104  isd = resResult.second;
105  isd->expectedChildren = expectedChildren;
106  isd->shmSegSize = shmSize;
107  isd->numBuffers = numBuffers;
108 
109  /* Construct user's shared-data region */
110  auto shareResult = reserveSpace<ShareDataType>(0);
111  isd->offsets[0] = shareResult.first;
112  sharedData = shareResult.second;
113 
114  /* Construct the circular buffers */
115  const size_t cbSize = sizeof(MsgType) * bufferSize;
116  for ( size_t c = 0 ; c < isd->numBuffers ; c++ ) {
117  CircBuff_t* cPtr = NULL;
118 
119  auto resResult = reserveSpace<CircBuff_t>(cbSize);
120  isd->offsets[1+c] = resResult.first;
121  cPtr = resResult.second;
122  cPtr->setBufferSize(bufferSize);
123  circBuffs.push_back(cPtr);
124  }
125 
126  }
127 
128  /**
129  * Access an existing Tunnel
130  * @param region_name Name of the shared-memory region to access
131  */
132  IPCTunnel(const std::string &region_name) : master(false), shmPtr(NULL), fd(-1)
133  {
134  fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR|S_IWUSR);
135  filename = region_name;
136 
137  if ( fd < 0 ) {
138  // Not using Output because IPC means Output might not be available
139  fprintf(stderr, "Failed to open IPC region '%s': %s\n",
140  filename.c_str(), strerror(errno));
141  exit(1);
142  }
143 
144  shmPtr = mmap(NULL, sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
145  if ( shmPtr == MAP_FAILED ) {
146  // Not using Output because IPC means Output might not be available
147  fprintf(stderr, "mmap 0 failed: %s\n", strerror(errno));
148  exit(1);
149  }
150 
151  isd = (InternalSharedData*)shmPtr;
152  shmSize = isd->shmSegSize;
153  munmap(shmPtr, sizeof(InternalSharedData));
154 
155  shmPtr = mmap(NULL, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
156  if ( shmPtr == MAP_FAILED ) {
157  // Not using Output because IPC means Output might not be available
158  fprintf(stderr, "mmap 1 failed: %s\n", strerror(errno));
159  exit(1);
160  }
161  isd = (InternalSharedData*)shmPtr;
162  sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
163 
164  for ( size_t c = 0 ; c < isd->numBuffers ; c++ ) {
165  circBuffs.push_back((CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c+1]));
166  }
167 
168  /* Clean up if we're the last to attach */
169  if ( --isd->expectedChildren == 0 ) {
170  shm_unlink(filename.c_str());
171  }
172  }
173 
174 
175  /**
176  * Destructor
177  */
178  virtual ~IPCTunnel()
179  {
180  shutdown(true);
181  }
182 
183  /**
184  * Shutdown
185  */
186  void shutdown(bool all = false)
187  {
188  if ( master ) {
189  for ( CircBuff_t *cb : circBuffs ) {
190  cb->~CircBuff_t();
191  }
192  }
193  if ( shmPtr ) {
194  munmap(shmPtr, shmSize);
195  shmPtr = NULL;
196  shmSize = 0;
197  }
198  if ( fd >= 0 ) {
199  close(fd);
200  fd = -1;
201  }
202  }
203 
204  const std::string& getRegionName(void) const { return filename; }
205 
206  /** return a pointer to the ShareDataType region */
207  ShareDataType* getSharedData() { return sharedData; }
208 
209  /** Blocks until space is available **/
210  void writeMessage(size_t core, const MsgType &command) {
211  circBuffs[core]->write(command);
212  }
213 
214  /** Blocks until a command is available **/
215  MsgType readMessage(size_t buffer) {
216  return circBuffs[buffer]->read();
217  }
218 
219  /** Non-blocking version of readMessage **/
220  bool readMessageNB(size_t buffer, MsgType *result) {
221  return circBuffs[buffer]->readNB(result);
222  }
223 
224  /** Empty the messages in the buffer **/
225  void clearBuffer(size_t core) {
226  circBuffs[core]->clearBuffer();
227  }
228 
229 
230 private:
231  template <typename T>
232  std::pair<size_t, T*> reserveSpace(size_t extraSpace = 0)
233  {
234  size_t space = sizeof(T) + extraSpace;
235  if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize )
236  return std::make_pair<size_t, T*>(0, NULL);
237  T* ptr = (T*)nextAllocPtr;
238  nextAllocPtr += space;
239  new (ptr) T(); // Call constructor if need be
240  return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
241  }
242 
243  size_t static calculateShmemSize(size_t numBuffers, size_t bufferSize)
244  {
245  long page_size = sysconf(_SC_PAGESIZE);
246 
247  /* Count how many pages are needed, at minimum */
248  size_t isd = 1 + ((sizeof(InternalSharedData) + (1+numBuffers)*sizeof(size_t)) / page_size);
249  size_t buffer = 1+ ((sizeof(CircBuff_t) +
250  bufferSize*sizeof(MsgType)) / page_size);
251  size_t shdata = 1+ ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
252 
253  /* Alloc 2 extra pages, just in case */
254  return (2 + isd + shdata + numBuffers*buffer) * page_size;
255  }
256 
257 protected:
258  /** Pointer to the Shared Data Region */
259  ShareDataType *sharedData;
260 
261 private:
262  bool master;
263  int fd;
264  std::string filename;
265  void *shmPtr;
266  uint8_t *nextAllocPtr;
267  size_t shmSize;
268  InternalSharedData *isd;
269  std::vector<CircBuff_t* > circBuffs;
270 
271 };
272 
273 }
274 }
275 }
276 
277 
278 
279 #endif
void shutdown(bool all=false)
Shutdown.
Definition: ipctunnel.h:186
Definition: action.cc:17
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition: ipctunnel.h:62
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition: ipctunnel.h:215
Definition: circularBuffer.h:12
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition: ipctunnel.h:210
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition: ipctunnel.h:259
virtual ~IPCTunnel()
Destructor.
Definition: ipctunnel.h:178
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition: ipctunnel.h:220
Tunneling class between two processes, connected by shared memory.
Definition: ipctunnel.h:43
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition: ipctunnel.h:207
IPCTunnel(const std::string &region_name)
Access an existing Tunnel.
Definition: ipctunnel.h:132
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition: ipctunnel.h:225