SST 15.0
Structural Simulation Toolkit
ipctunnel.h
1// Copyright 2009-2025 NTESS. Under the terms
2// of Contract DE-NA0003525 with NTESS, the U.S.
3// Government retains certain rights in this software.
4//
5// Copyright (c) 2009-2025, NTESS
6// All rights reserved.
7//
8// This file is part of the SST software package. For license
9// information, see the LICENSE file in the top level directory of the
10// distribution.
11
12#ifndef SST_CORE_INTERPROCESS_IPCTUNNEL_H
13#define SST_CORE_INTERPROCESS_IPCTUNNEL_H
14
15#include "sst/core/interprocess/circularBuffer.h"
16
17#include <cstdio>
18#include <cstring>
19#include <errno.h>
20#include <fcntl.h>
21#include <string>
22#include <sys/mman.h>
23#include <sys/stat.h>
24#include <unistd.h>
25#include <vector>
26
27namespace SST::Core::Interprocess {
28
29extern uint32_t globalIPCTunnelCount;
30/**
31 * Tunneling class between two processes, connected by shared memory.
32 * Supports multiple circular-buffer queues, and a generic region
33 * of memory for shared data.
34 *
35 * @tparam ShareDataType Type to put in the shared data region
36 * @tparam MsgType Type of messages being sent in the circular buffers
37 */
38template <typename ShareDataType, typename MsgType>
40{
41
43
44 struct InternalSharedData
45 {
46 volatile uint32_t expectedChildren;
47 size_t shmSegSize;
48 size_t numBuffers;
49 size_t offsets[0]; // Actual size: numBuffers + 2
50 };
51
52public:
53 /**
54 * Construct a new Tunnel for IPC Communications
55 * @param comp_id Component ID of owner
56 * @param numBuffers Number of buffers for which we should tunnel
57 * @param bufferSize How large each core's buffer should be
58 */
59 IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren = 1) :
60 master(true),
61 shmPtr(nullptr),
62 fd(-1)
63 {
64 char key[256];
65 memset(key, '\0', sizeof(key));
66 do {
67 snprintf(key, sizeof(key), "/sst_shmem_%u-%" PRIu32 "-%d", getpid(), comp_id, rand());
68 filename = key;
69
70 fd = shm_open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
71 /* There's a rare chance that a file we are looking to use exists.
72 * It's unlikely, but perhaps a previous run (with the same PID
73 * and random number) crashed before the * clients all connected.
74 *
75 * So, if we get an error, and the error is EEXIST, try again with
76 * a different random number.
77 */
78 } while ( (fd < 0) && (errno == EEXIST) );
79 if ( fd < 0 ) {
80 // Not using Output because IPC means Output might not be available
81 fprintf(stderr, "Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
82 exit(1);
83 }
84
85 shmSize = calculateShmemSize(numBuffers, bufferSize);
86 if ( ftruncate(fd, shmSize) ) {
87 // Not using Output because IPC means Output might not be available
88 fprintf(stderr, "Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
89 exit(1);
90 }
91
92 shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
93 if ( shmPtr == MAP_FAILED ) {
94 // Not using Output because IPC means Output might not be available
95 fprintf(stderr, "mmap failed: %s\n", strerror(errno));
96 exit(1);
97 }
98 nextAllocPtr = (uint8_t*)shmPtr;
99 memset(shmPtr, '\0', shmSize);
100
101 /* Construct our private buffer first. Used for our communications */
102 auto resResult = reserveSpace<InternalSharedData>((1 + numBuffers) * sizeof(size_t));
103 isd = resResult.second;
104 isd->expectedChildren = expectedChildren;
105 isd->shmSegSize = shmSize;
106 isd->numBuffers = numBuffers;
107
108 /* Construct user's shared-data region */
109 auto shareResult = reserveSpace<ShareDataType>(0);
110 isd->offsets[0] = shareResult.first;
111 sharedData = shareResult.second;
112
113 /* Construct the circular buffers */
114 const size_t cbSize = sizeof(MsgType) * bufferSize;
115 for ( size_t c = 0; c < isd->numBuffers; c++ ) {
116 CircBuff_t* cPtr = nullptr;
117
118 auto resResult = reserveSpace<CircBuff_t>(cbSize);
119 isd->offsets[1 + c] = resResult.first;
120 cPtr = resResult.second;
121 if ( !cPtr->setBufferSize(bufferSize) ) exit(1);
122 circBuffs.push_back(cPtr);
123 }
124 }
125
126 /**
127 * Access an existing Tunnel
128 * @param region_name Name of the shared-memory region to access
129 */
130 explicit IPCTunnel(const std::string& region_name) :
131 master(false),
132 shmPtr(nullptr),
133 fd(-1)
134 {
135 fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
136 filename = region_name;
137
138 if ( fd < 0 ) {
139 // Not using Output because IPC means Output might not be available
140 fprintf(stderr, "Failed to open IPC region '%s': %s\n", filename.c_str(), strerror(errno));
141 exit(1);
142 }
143
144 shmPtr = mmap(nullptr, sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
145 if ( shmPtr == MAP_FAILED ) {
146 // Not using Output because IPC means Output might not be available
147 fprintf(stderr, "mmap 0 failed: %s\n", strerror(errno));
148 exit(1);
149 }
150
151 isd = (InternalSharedData*)shmPtr;
152 shmSize = isd->shmSegSize;
153 munmap(shmPtr, sizeof(InternalSharedData));
154
155 shmPtr = mmap(nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
156 if ( shmPtr == MAP_FAILED ) {
157 // Not using Output because IPC means Output might not be available
158 fprintf(stderr, "mmap 1 failed: %s\n", strerror(errno));
159 exit(1);
160 }
161 isd = (InternalSharedData*)shmPtr;
162 sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
163
164 for ( size_t c = 0; c < isd->numBuffers; c++ ) {
165 circBuffs.push_back((CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c + 1]));
166 }
167
168 /* Clean up if we're the last to attach */
169 if ( --isd->expectedChildren == 0 ) {
170 shm_unlink(filename.c_str());
171 }
172 }
173
174 /**
175 * Destructor
176 */
177 virtual ~IPCTunnel() { shutdown(true); }
178
179 /**
180 * Shutdown
181 */
182 void shutdown(bool all = false)
183 {
184 if ( master ) {
185 for ( CircBuff_t* cb : circBuffs ) {
186 cb->~CircBuff_t();
187 }
188 }
189 if ( shmPtr ) {
190 munmap(shmPtr, shmSize);
191 shmPtr = nullptr;
192 shmSize = 0;
193 }
194 if ( fd >= 0 ) {
195 close(fd);
196 fd = -1;
197 }
198 }
199
200 const std::string& getRegionName() const { return filename; }
201
202 /** return a pointer to the ShareDataType region */
203 ShareDataType* getSharedData() { return sharedData; }
204
205 /** Blocks until space is available **/
206 void writeMessage(size_t core, const MsgType& command) { circBuffs[core]->write(command); }
207
208 /** Blocks until a command is available **/
209 MsgType readMessage(size_t buffer) { return circBuffs[buffer]->read(); }
210
211 /** Non-blocking version of readMessage **/
212 bool readMessageNB(size_t buffer, MsgType* result) { return circBuffs[buffer]->readNB(result); }
213
214 /** Empty the messages in the buffer **/
215 void clearBuffer(size_t core) { circBuffs[core]->clearBuffer(); }
216
217private:
218 template <typename T>
219 std::pair<size_t, T*> reserveSpace(size_t extraSpace = 0)
220 {
221 size_t space = sizeof(T) + extraSpace;
222 if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize ) return std::make_pair<size_t, T*>(0, nullptr);
223 T* ptr = (T*)nextAllocPtr;
224 nextAllocPtr += space;
225 new (ptr) T(); // Call constructor if need be
226 return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
227 }
228
229 size_t static calculateShmemSize(size_t numBuffers, size_t bufferSize)
230 {
231 long page_size = sysconf(_SC_PAGESIZE);
232
233 /* Count how many pages are needed, at minimum */
234 size_t isd = 1 + ((sizeof(InternalSharedData) + (1 + numBuffers) * sizeof(size_t)) / page_size);
235 size_t buffer = 1 + ((sizeof(CircBuff_t) + bufferSize * sizeof(MsgType)) / page_size);
236 size_t shdata = 1 + ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
237
238 /* Alloc 2 extra pages, just in case */
239 return (2 + isd + shdata + numBuffers * buffer) * page_size;
240 }
241
242protected:
243 /** Pointer to the Shared Data Region */
244 ShareDataType* sharedData;
245
246private:
247 bool master;
248 void* shmPtr;
249 int fd;
250
251 std::string filename;
252 uint8_t* nextAllocPtr;
253 size_t shmSize;
254 InternalSharedData* isd;
255 std::vector<CircBuff_t*> circBuffs;
256};
257
258} // namespace SST::Core::Interprocess
259
260#endif // SST_CORE_INTERPROCESS_IPCTUNNEL_H
Definition circularBuffer.h:24
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition ipctunnel.h:203
virtual ~IPCTunnel()
Destructor.
Definition ipctunnel.h:177
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition ipctunnel.h:215
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition ipctunnel.h:59
void shutdown(bool all=false)
Shutdown.
Definition ipctunnel.h:182
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition ipctunnel.h:206
IPCTunnel(const std::string &region_name)
Access an existing Tunnel.
Definition ipctunnel.h:130
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition ipctunnel.h:209
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition ipctunnel.h:212
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition ipctunnel.h:244