12 #ifndef SST_CORE_INTERPROCESS_TUNNEL_H
13 #define SST_CORE_INTERPROCESS_TUNNEL_H 1
26 #include "sst/core/interprocess/circularBuffer.h"
30 namespace Interprocess {
33 extern uint32_t globalIPCTunnelCount;
42 template<
typename ShareDataType,
typename MsgType>
47 struct InternalSharedData {
48 volatile uint32_t expectedChildren;
62 IPCTunnel(uint32_t comp_id,
size_t numBuffers,
size_t bufferSize, uint32_t expectedChildren = 1) : master(true), shmPtr(nullptr), fd(-1)
65 memset(key,
'\0',
sizeof(key));
67 snprintf(key,
sizeof(key),
"/sst_shmem_%u-%" PRIu32
"-%d", getpid(), comp_id, rand());
70 fd = shm_open(filename.c_str(), O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
78 }
while ( (fd < 0) && (errno == EEXIST) );
81 fprintf(stderr,
"Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
86 shmSize = calculateShmemSize(numBuffers, bufferSize);
87 if ( ftruncate(fd, shmSize) ) {
89 fprintf(stderr,
"Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
93 shmPtr = mmap(
nullptr, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
94 if ( shmPtr == MAP_FAILED ) {
96 fprintf(stderr,
"mmap failed: %s\n", strerror(errno));
99 nextAllocPtr = (uint8_t*)shmPtr;
100 memset(shmPtr,
'\0', shmSize);
103 auto resResult = reserveSpace<InternalSharedData>((1+numBuffers)*
sizeof(
size_t));
104 isd = resResult.second;
105 isd->expectedChildren = expectedChildren;
106 isd->shmSegSize = shmSize;
107 isd->numBuffers = numBuffers;
110 auto shareResult = reserveSpace<ShareDataType>(0);
111 isd->offsets[0] = shareResult.first;
115 const size_t cbSize =
sizeof(MsgType) * bufferSize;
116 for (
size_t c = 0 ; c < isd->numBuffers ; c++ ) {
119 auto resResult = reserveSpace<CircBuff_t>(cbSize);
120 isd->offsets[1+c] = resResult.first;
121 cPtr = resResult.second;
122 if (!cPtr->setBufferSize(bufferSize))
124 circBuffs.push_back(cPtr);
133 IPCTunnel(
const std::string& region_name) : master(false), shmPtr(nullptr), fd(-1)
135 fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR|S_IWUSR);
136 filename = region_name;
140 fprintf(stderr,
"Failed to open IPC region '%s': %s\n",
141 filename.c_str(), strerror(errno));
145 shmPtr = mmap(
nullptr,
sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
146 if ( shmPtr == MAP_FAILED ) {
148 fprintf(stderr,
"mmap 0 failed: %s\n", strerror(errno));
152 isd = (InternalSharedData*)shmPtr;
153 shmSize = isd->shmSegSize;
154 munmap(shmPtr,
sizeof(InternalSharedData));
156 shmPtr = mmap(
nullptr, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
157 if ( shmPtr == MAP_FAILED ) {
159 fprintf(stderr,
"mmap 1 failed: %s\n", strerror(errno));
162 isd = (InternalSharedData*)shmPtr;
163 sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
165 for (
size_t c = 0 ; c < isd->numBuffers ; c++ ) {
166 circBuffs.push_back((
CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c+1]));
170 if ( --isd->expectedChildren == 0 ) {
171 shm_unlink(filename.c_str());
195 munmap(shmPtr, shmSize);
205 const std::string& getRegionName(
void)
const {
return filename; }
212 circBuffs[core]->write(command);
217 return circBuffs[buffer]->read();
222 return circBuffs[buffer]->readNB(result);
227 circBuffs[core]->clearBuffer();
232 template <
typename T>
233 std::pair<size_t, T*> reserveSpace(
size_t extraSpace = 0)
235 size_t space =
sizeof(T) + extraSpace;
236 if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize )
237 return std::make_pair<size_t, T*>(0,
nullptr);
238 T* ptr = (T*)nextAllocPtr;
239 nextAllocPtr += space;
241 return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
244 size_t static calculateShmemSize(
size_t numBuffers,
size_t bufferSize)
246 long page_size = sysconf(_SC_PAGESIZE);
249 size_t isd = 1 + ((
sizeof(InternalSharedData) + (1+numBuffers)*
sizeof(size_t)) / page_size);
250 size_t buffer = 1+ ((
sizeof(CircBuff_t) +
251 bufferSize*
sizeof(MsgType)) / page_size);
252 size_t shdata = 1+ ((
sizeof(ShareDataType) +
sizeof(InternalSharedData)) / page_size);
255 return (2 + isd + shdata + numBuffers*buffer) * page_size;
267 std::string filename;
268 uint8_t *nextAllocPtr;
270 InternalSharedData *isd;
271 std::vector<CircBuff_t* > circBuffs;
void shutdown(bool all=false)
Shutdown.
Definition: ipctunnel.h:187
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition: ipctunnel.h:62
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition: ipctunnel.h:216
Definition: circularBuffer.h:22
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition: ipctunnel.h:211
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition: ipctunnel.h:260
virtual ~IPCTunnel()
Destructor.
Definition: ipctunnel.h:179
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition: ipctunnel.h:221
Tunneling class between two processes, connected by shared memory.
Definition: ipctunnel.h:43
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition: ipctunnel.h:208
IPCTunnel(const std::string ®ion_name)
Access an existing Tunnel.
Definition: ipctunnel.h:133
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition: ipctunnel.h:226