SST 16.0.0
Structural Simulation Toolkit
ipctunnel.h
1// Copyright 2009-2026 NTESS. Under the terms
2// of Contract DE-NA0003525 with NTESS, the U.S.
3// Government retains certain rights in this software.
4//
5// Copyright (c) 2009-2026, NTESS
6// All rights reserved.
7//
8// This file is part of the SST software package. For license
9// information, see the LICENSE file in the top level directory of the
10// distribution.
11
12#ifndef SST_CORE_INTERPROCESS_IPCTUNNEL_H
13#define SST_CORE_INTERPROCESS_IPCTUNNEL_H
14
15#include "sst/core/interprocess/circularBuffer.h"
16
17#include <cerrno>
18#include <cstddef>
19#include <cstdint>
20#include <cstdio>
21#include <cstring>
22#include <errno.h>
23#include <fcntl.h>
24#include <string>
25#include <sys/mman.h>
26#include <sys/stat.h>
27#include <unistd.h>
28#include <utility>
29#include <vector>
30
31namespace SST::Core::Interprocess {
32
33extern uint32_t globalIPCTunnelCount;
34/**
35 * Tunneling class between two processes, connected by shared memory.
36 * Supports multiple circular-buffer queues, and a generic region
37 * of memory for shared data.
38 *
39 * @tparam ShareDataType Type to put in the shared data region
40 * @tparam MsgType Type of messages being sent in the circular buffers
41 */
42template <typename ShareDataType, typename MsgType>
44{
45
47
48 struct InternalSharedData
49 {
50 volatile uint32_t expectedChildren;
51 size_t shmSegSize;
52 size_t numBuffers;
53 size_t offsets[0]; // Actual size: numBuffers + 2
54 };
55
56public:
57 /**
58 * Construct a new Tunnel for IPC Communications
59 * @param comp_id Component ID of owner
60 * @param numBuffers Number of buffers for which we should tunnel
61 * @param bufferSize How large each core's buffer should be
62 */
63 IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren = 1) :
64 master(true),
65 shmPtr(nullptr),
66 fd(-1)
67 {
68 char key[256];
69 memset(key, '\0', sizeof(key));
70 do {
71 snprintf(key, sizeof(key), "/sst_shmem_%u-%" PRIu32 "-%d", getpid(), comp_id, rand());
72 filename = key;
73
74 fd = shm_open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
75 /* There's a rare chance that a file we are looking to use exists.
76 * It's unlikely, but perhaps a previous run (with the same PID
77 * and random number) crashed before the * clients all connected.
78 *
79 * So, if we get an error, and the error is EEXIST, try again with
80 * a different random number.
81 */
82 } while ( (fd < 0) && (errno == EEXIST) );
83 if ( fd < 0 ) {
84 // Not using Output because IPC means Output might not be available
85 fprintf(stderr, "Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
86 exit(1);
87 }
88
89 shmSize = calculateShmemSize(numBuffers, bufferSize);
90 if ( ftruncate(fd, shmSize) ) {
91 // Not using Output because IPC means Output might not be available
92 fprintf(stderr, "Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
93 exit(1);
94 }
95
96 shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
97 if ( shmPtr == MAP_FAILED ) {
98 // Not using Output because IPC means Output might not be available
99 fprintf(stderr, "mmap failed: %s\n", strerror(errno));
100 exit(1);
101 }
102 nextAllocPtr = (uint8_t*)shmPtr;
103 memset(shmPtr, '\0', shmSize);
104
105 /* Construct our private buffer first. Used for our communications */
106 auto resResult = reserveSpace<InternalSharedData>((1 + numBuffers) * sizeof(size_t));
107 isd = resResult.second;
108 isd->expectedChildren = expectedChildren;
109 isd->shmSegSize = shmSize;
110 isd->numBuffers = numBuffers;
111
112 /* Construct user's shared-data region */
113 auto shareResult = reserveSpace<ShareDataType>(0);
114 isd->offsets[0] = shareResult.first;
115 sharedData = shareResult.second;
116
117 /* Construct the circular buffers */
118 const size_t cbSize = sizeof(MsgType) * bufferSize;
119 for ( size_t c = 0; c < isd->numBuffers; c++ ) {
120 CircBuff_t* cPtr = nullptr;
121
122 auto resResult = reserveSpace<CircBuff_t>(cbSize);
123 isd->offsets[1 + c] = resResult.first;
124 cPtr = resResult.second;
125 if ( !cPtr->setBufferSize(bufferSize) ) exit(1);
126 circBuffs.push_back(cPtr);
127 }
128 }
129
130 /**
131 * Access an existing Tunnel
132 * @param region_name Name of the shared-memory region to access
133 */
134 explicit IPCTunnel(const std::string& region_name) :
135 master(false),
136 shmPtr(nullptr),
137 fd(-1)
138 {
139 fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
140 filename = region_name;
141
142 if ( fd < 0 ) {
143 // Not using Output because IPC means Output might not be available
144 fprintf(stderr, "Failed to open IPC region '%s': %s\n", filename.c_str(), strerror(errno));
145 exit(1);
146 }
147
148 shmPtr = mmap(nullptr, sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
149 if ( shmPtr == MAP_FAILED ) {
150 // Not using Output because IPC means Output might not be available
151 fprintf(stderr, "mmap 0 failed: %s\n", strerror(errno));
152 exit(1);
153 }
154
155 isd = (InternalSharedData*)shmPtr;
156 shmSize = isd->shmSegSize;
157 munmap(shmPtr, sizeof(InternalSharedData));
158
159 shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
160 if ( shmPtr == MAP_FAILED ) {
161 // Not using Output because IPC means Output might not be available
162 fprintf(stderr, "mmap 1 failed: %s\n", strerror(errno));
163 exit(1);
164 }
165 isd = (InternalSharedData*)shmPtr;
166 sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
167
168 for ( size_t c = 0; c < isd->numBuffers; c++ ) {
169 circBuffs.push_back((CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c + 1]));
170 }
171
172 /* Clean up if we're the last to attach */
173 if ( --isd->expectedChildren == 0 ) {
174 shm_unlink(filename.c_str());
175 }
176 }
177
178 /**
179 * Destructor
180 */
181 virtual ~IPCTunnel() { shutdown(true); }
182
183 /**
184 * Shutdown
185 */
186 void shutdown(bool all = false)
187 {
188 if ( master ) {
189 for ( CircBuff_t* cb : circBuffs ) {
190 cb->~CircBuff_t();
191 }
192 }
193 if ( shmPtr ) {
194 munmap(shmPtr, shmSize);
195 shmPtr = nullptr;
196 shmSize = 0;
197 }
198 if ( fd >= 0 ) {
199 close(fd);
200 fd = -1;
201 }
202 }
203
204 const std::string& getRegionName() const { return filename; }
205
206 /** return a pointer to the ShareDataType region */
207 ShareDataType* getSharedData() { return sharedData; }
208
209 /** Blocks until space is available **/
210 void writeMessage(size_t core, const MsgType& command) { circBuffs[core]->write(command); }
211
212 /** Blocks until a command is available **/
213 MsgType readMessage(size_t buffer) { return circBuffs[buffer]->read(); }
214
215 /** Non-blocking version of readMessage **/
216 bool readMessageNB(size_t buffer, MsgType* result) { return circBuffs[buffer]->readNB(result); }
217
218 /** Empty the messages in the buffer **/
219 void clearBuffer(size_t core) { circBuffs[core]->clearBuffer(); }
220
221private:
222 template <typename T>
223 std::pair<size_t, T*> reserveSpace(size_t extraSpace = 0)
224 {
225 size_t space = sizeof(T) + extraSpace;
226 if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize ) return std::make_pair<size_t, T*>(0, nullptr);
227 T* ptr = (T*)nextAllocPtr;
228 nextAllocPtr += space;
229 new (ptr) T(); // Call constructor if need be
230 return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
231 }
232
233 size_t static calculateShmemSize(size_t numBuffers, size_t bufferSize)
234 {
235 long page_size = sysconf(_SC_PAGESIZE);
236
237 /* Count how many pages are needed, at minimum */
238 size_t isd = 1 + ((sizeof(InternalSharedData) + (1 + numBuffers) * sizeof(size_t)) / page_size);
239 size_t buffer = 1 + ((sizeof(CircBuff_t) + bufferSize * sizeof(MsgType)) / page_size);
240 size_t shdata = 1 + ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
241
242 /* Alloc 2 extra pages, just in case */
243 return (2 + isd + shdata + numBuffers * buffer) * page_size;
244 }
245
246protected:
247 /** Pointer to the Shared Data Region */
248 ShareDataType* sharedData;
249
250private:
251 bool master;
252 void* shmPtr;
253 int fd;
254
255 std::string filename;
256 uint8_t* nextAllocPtr;
257 size_t shmSize;
258 InternalSharedData* isd;
259 std::vector<CircBuff_t*> circBuffs;
260};
261
262} // namespace SST::Core::Interprocess
263
264#endif // SST_CORE_INTERPROCESS_IPCTUNNEL_H
Definition circularBuffer.h:24
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition ipctunnel.h:207
virtual ~IPCTunnel()
Destructor.
Definition ipctunnel.h:181
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition ipctunnel.h:219
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition ipctunnel.h:63
void shutdown(bool all=false)
Shutdown.
Definition ipctunnel.h:186
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition ipctunnel.h:210
IPCTunnel(const std::string &region_name)
Access an existing Tunnel.
Definition ipctunnel.h:134
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition ipctunnel.h:213
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition ipctunnel.h:216
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition ipctunnel.h:248