SST 12.1.0
Structural Simulation Toolkit
ipctunnel.h
1// Copyright 2009-2022 NTESS. Under the terms
2// of Contract DE-NA0003525 with NTESS, the U.S.
3// Government retains certain rights in this software.
4//
5// Copyright (c) 2009-2022, NTESS
6// All rights reserved.
7//
8// This file is part of the SST software package. For license
9// information, see the LICENSE file in the top level directory of the
10// distribution.
11
12#ifndef SST_CORE_INTERPROCESS_IPCTUNNEL_H
13#define SST_CORE_INTERPROCESS_IPCTUNNEL_H
14
15#include "sst/core/interprocess/circularBuffer.h"
16
17#include <cstdio>
18#include <cstring>
19#include <errno.h>
20#include <fcntl.h>
21#include <string>
22#include <sys/mman.h>
23#include <sys/stat.h>
24#include <unistd.h>
25#include <vector>
26
27namespace SST {
28namespace Core {
29namespace Interprocess {
30
31extern uint32_t globalIPCTunnelCount;
32/**
33 * Tunneling class between two processes, connected by shared memory.
34 * Supports multiple circular-buffer queues, and a generic region
35 * of memory for shared data.
36 *
37 * @tparam ShareDataType Type to put in the shared data region
38 * @tparam MsgType Type of messages being sent in the circular buffers
39 */
40template <typename ShareDataType, typename MsgType>
42{
43
45
46 struct InternalSharedData
47 {
48 volatile uint32_t expectedChildren;
49 size_t shmSegSize;
50 size_t numBuffers;
51 size_t offsets[0]; // Actual size: numBuffers + 2
52 };
53
54public:
55 /**
56 * Construct a new Tunnel for IPC Communications
57 * @param comp_id Component ID of owner
58 * @param numBuffers Number of buffers for which we should tunnel
59 * @param bufferSize How large each core's buffer should be
60 */
61 IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren = 1) :
62 master(true),
63 shmPtr(nullptr),
64 fd(-1)
65 {
66 char key[256];
67 memset(key, '\0', sizeof(key));
68 do {
69 snprintf(key, sizeof(key), "/sst_shmem_%u-%" PRIu32 "-%d", getpid(), comp_id, rand());
70 filename = key;
71
72 fd = shm_open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
73 /* There's a rare chance that a file we are looking to use exists.
74 * It's unlikely, but perhaps a previous run (with the same PID
75 * and random number) crashed before the * clients all connected.
76 *
77 * So, if we get an error, and the error is EEXIST, try again with
78 * a different random number.
79 */
80 } while ( (fd < 0) && (errno == EEXIST) );
81 if ( fd < 0 ) {
82 // Not using Output because IPC means Output might not be available
83 fprintf(stderr, "Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
84 exit(1);
85 }
86
87 shmSize = calculateShmemSize(numBuffers, bufferSize);
88 if ( ftruncate(fd, shmSize) ) {
89 // Not using Output because IPC means Output might not be available
90 fprintf(stderr, "Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
91 exit(1);
92 }
93
94 shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
95 if ( shmPtr == MAP_FAILED ) {
96 // Not using Output because IPC means Output might not be available
97 fprintf(stderr, "mmap failed: %s\n", strerror(errno));
98 exit(1);
99 }
100 nextAllocPtr = (uint8_t*)shmPtr;
101 memset(shmPtr, '\0', shmSize);
102
103 /* Construct our private buffer first. Used for our communications */
104 auto resResult = reserveSpace<InternalSharedData>((1 + numBuffers) * sizeof(size_t));
105 isd = resResult.second;
106 isd->expectedChildren = expectedChildren;
107 isd->shmSegSize = shmSize;
108 isd->numBuffers = numBuffers;
109
110 /* Construct user's shared-data region */
111 auto shareResult = reserveSpace<ShareDataType>(0);
112 isd->offsets[0] = shareResult.first;
113 sharedData = shareResult.second;
114
115 /* Construct the circular buffers */
116 const size_t cbSize = sizeof(MsgType) * bufferSize;
117 for ( size_t c = 0; c < isd->numBuffers; c++ ) {
118 CircBuff_t* cPtr = nullptr;
119
120 auto resResult = reserveSpace<CircBuff_t>(cbSize);
121 isd->offsets[1 + c] = resResult.first;
122 cPtr = resResult.second;
123 if ( !cPtr->setBufferSize(bufferSize) ) exit(1);
124 circBuffs.push_back(cPtr);
125 }
126 }
127
128 /**
129 * Access an existing Tunnel
130 * @param region_name Name of the shared-memory region to access
131 */
132 IPCTunnel(const std::string& region_name) : master(false), shmPtr(nullptr), fd(-1)
133 {
134 fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
135 filename = region_name;
136
137 if ( fd < 0 ) {
138 // Not using Output because IPC means Output might not be available
139 fprintf(stderr, "Failed to open IPC region '%s': %s\n", filename.c_str(), strerror(errno));
140 exit(1);
141 }
142
143 shmPtr = mmap(nullptr, sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
144 if ( shmPtr == MAP_FAILED ) {
145 // Not using Output because IPC means Output might not be available
146 fprintf(stderr, "mmap 0 failed: %s\n", strerror(errno));
147 exit(1);
148 }
149
150 isd = (InternalSharedData*)shmPtr;
151 shmSize = isd->shmSegSize;
152 munmap(shmPtr, sizeof(InternalSharedData));
153
154 shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
155 if ( shmPtr == MAP_FAILED ) {
156 // Not using Output because IPC means Output might not be available
157 fprintf(stderr, "mmap 1 failed: %s\n", strerror(errno));
158 exit(1);
159 }
160 isd = (InternalSharedData*)shmPtr;
161 sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
162
163 for ( size_t c = 0; c < isd->numBuffers; c++ ) {
164 circBuffs.push_back((CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c + 1]));
165 }
166
167 /* Clean up if we're the last to attach */
168 if ( --isd->expectedChildren == 0 ) { shm_unlink(filename.c_str()); }
169 }
170
171 /**
172 * Destructor
173 */
174 virtual ~IPCTunnel() { shutdown(true); }
175
176 /**
177 * Shutdown
178 */
179 void shutdown(bool all = false)
180 {
181 if ( master ) {
182 for ( CircBuff_t* cb : circBuffs ) {
183 cb->~CircBuff_t();
184 }
185 }
186 if ( shmPtr ) {
187 munmap(shmPtr, shmSize);
188 shmPtr = nullptr;
189 shmSize = 0;
190 }
191 if ( fd >= 0 ) {
192 close(fd);
193 fd = -1;
194 }
195 }
196
197 const std::string& getRegionName(void) const { return filename; }
198
199 /** return a pointer to the ShareDataType region */
200 ShareDataType* getSharedData() { return sharedData; }
201
202 /** Blocks until space is available **/
203 void writeMessage(size_t core, const MsgType& command) { circBuffs[core]->write(command); }
204
205 /** Blocks until a command is available **/
206 MsgType readMessage(size_t buffer) { return circBuffs[buffer]->read(); }
207
208 /** Non-blocking version of readMessage **/
209 bool readMessageNB(size_t buffer, MsgType* result) { return circBuffs[buffer]->readNB(result); }
210
211 /** Empty the messages in the buffer **/
212 void clearBuffer(size_t core) { circBuffs[core]->clearBuffer(); }
213
214private:
215 template <typename T>
216 std::pair<size_t, T*> reserveSpace(size_t extraSpace = 0)
217 {
218 size_t space = sizeof(T) + extraSpace;
219 if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize ) return std::make_pair<size_t, T*>(0, nullptr);
220 T* ptr = (T*)nextAllocPtr;
221 nextAllocPtr += space;
222 new (ptr) T(); // Call constructor if need be
223 return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
224 }
225
226 size_t static calculateShmemSize(size_t numBuffers, size_t bufferSize)
227 {
228 long page_size = sysconf(_SC_PAGESIZE);
229
230 /* Count how many pages are needed, at minimum */
231 size_t isd = 1 + ((sizeof(InternalSharedData) + (1 + numBuffers) * sizeof(size_t)) / page_size);
232 size_t buffer = 1 + ((sizeof(CircBuff_t) + bufferSize * sizeof(MsgType)) / page_size);
233 size_t shdata = 1 + ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
234
235 /* Alloc 2 extra pages, just in case */
236 return (2 + isd + shdata + numBuffers * buffer) * page_size;
237 }
238
239protected:
240 /** Pointer to the Shared Data Region */
241 ShareDataType* sharedData;
242
243private:
244 bool master;
245 void* shmPtr;
246 int fd;
247
248 std::string filename;
249 uint8_t* nextAllocPtr;
250 size_t shmSize;
251 InternalSharedData* isd;
252 std::vector<CircBuff_t*> circBuffs;
253};
254
255} // namespace Interprocess
256} // namespace Core
257} // namespace SST
258
259#endif // SST_CORE_INTERPROCESS_IPCTUNNEL_H
Definition: circularBuffer.h:23
Tunneling class between two processes, connected by shared memory.
Definition: ipctunnel.h:42
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition: ipctunnel.h:200
virtual ~IPCTunnel()
Destructor.
Definition: ipctunnel.h:174
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition: ipctunnel.h:212
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition: ipctunnel.h:61
void shutdown(bool all=false)
Shutdown.
Definition: ipctunnel.h:179
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition: ipctunnel.h:203
IPCTunnel(const std::string &region_name)
Access an existing Tunnel.
Definition: ipctunnel.h:132
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition: ipctunnel.h:206
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition: ipctunnel.h:209
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition: ipctunnel.h:241