12 #ifndef SST_CORE_INTERPROCESS_IPCTUNNEL_H 13 #define SST_CORE_INTERPROCESS_IPCTUNNEL_H 15 #include "sst/core/interprocess/circularBuffer.h" 33 extern uint32_t globalIPCTunnelCount;
42 template <
typename ShareDataType,
typename MsgType>
48 struct InternalSharedData
50 volatile uint32_t expectedChildren;
63 IPCTunnel(uint32_t comp_id,
size_t numBuffers,
size_t bufferSize, uint32_t expectedChildren = 1) :
69 memset(key,
'\0',
sizeof(key));
71 snprintf(key,
sizeof(key),
"/sst_shmem_%u-%" PRIu32
"-%d", getpid(), comp_id, rand());
74 fd = shm_open(filename.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
82 }
while ( (fd < 0) && (errno == EEXIST) );
85 fprintf(stderr,
"Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
89 shmSize = calculateShmemSize(numBuffers, bufferSize);
90 if ( ftruncate(fd, shmSize) ) {
92 fprintf(stderr,
"Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
96 shmPtr = mmap(
nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
97 if ( shmPtr == MAP_FAILED ) {
99 fprintf(stderr,
"mmap failed: %s\n", strerror(errno));
102 nextAllocPtr = (uint8_t*)shmPtr;
103 memset(shmPtr,
'\0', shmSize);
106 auto resResult = reserveSpace<InternalSharedData>((1 + numBuffers) *
sizeof(
size_t));
107 isd = resResult.second;
108 isd->expectedChildren = expectedChildren;
109 isd->shmSegSize = shmSize;
110 isd->numBuffers = numBuffers;
113 auto shareResult = reserveSpace<ShareDataType>(0);
114 isd->offsets[0] = shareResult.first;
118 const size_t cbSize =
sizeof(MsgType) * bufferSize;
119 for (
size_t c = 0; c < isd->numBuffers; c++ ) {
122 auto resResult = reserveSpace<CircBuff_t>(cbSize);
123 isd->offsets[1 + c] = resResult.first;
124 cPtr = resResult.second;
125 if ( !cPtr->setBufferSize(bufferSize) ) exit(1);
126 circBuffs.push_back(cPtr);
139 fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
140 filename = region_name;
144 fprintf(stderr,
"Failed to open IPC region '%s': %s\n", filename.c_str(), strerror(errno));
148 shmPtr = mmap(
nullptr,
sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
149 if ( shmPtr == MAP_FAILED ) {
151 fprintf(stderr,
"mmap 0 failed: %s\n", strerror(errno));
155 isd = (InternalSharedData*)shmPtr;
156 shmSize = isd->shmSegSize;
157 munmap(shmPtr,
sizeof(InternalSharedData));
159 shmPtr = mmap(
nullptr, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
160 if ( shmPtr == MAP_FAILED ) {
162 fprintf(stderr,
"mmap 1 failed: %s\n", strerror(errno));
165 isd = (InternalSharedData*)shmPtr;
166 sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
168 for (
size_t c = 0; c < isd->numBuffers; c++ ) {
169 circBuffs.push_back((
CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c + 1]));
173 if ( --isd->expectedChildren == 0 ) {
174 shm_unlink(filename.c_str());
194 munmap(shmPtr, shmSize);
204 const std::string& getRegionName()
const {
return filename; }
210 void writeMessage(
size_t core,
const MsgType& command) { circBuffs[core]->write(command); }
213 MsgType
readMessage(
size_t buffer) {
return circBuffs[buffer]->read(); }
216 bool readMessageNB(
size_t buffer, MsgType* result) {
return circBuffs[buffer]->readNB(result); }
222 template <
typename T>
223 std::pair<size_t, T*> reserveSpace(
size_t extraSpace = 0)
225 size_t space =
sizeof(T) + extraSpace;
226 if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize )
return std::make_pair<size_t, T*>(0,
nullptr);
227 T* ptr = (T*)nextAllocPtr;
228 nextAllocPtr += space;
230 return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
233 size_t static calculateShmemSize(
size_t numBuffers,
size_t bufferSize)
235 long page_size = sysconf(_SC_PAGESIZE);
238 size_t isd = 1 + ((
sizeof(InternalSharedData) + (1 + numBuffers) *
sizeof(size_t)) / page_size);
239 size_t buffer = 1 + ((
sizeof(CircBuff_t) + bufferSize *
sizeof(MsgType)) / page_size);
240 size_t shdata = 1 + ((
sizeof(ShareDataType) +
sizeof(InternalSharedData)) / page_size);
243 return (2 + isd + shdata + numBuffers * buffer) * page_size;
255 std::string filename;
256 uint8_t* nextAllocPtr;
258 InternalSharedData* isd;
259 std::vector<CircBuff_t*> circBuffs;
264 #endif // SST_CORE_INTERPROCESS_IPCTUNNEL_H void shutdown(bool all=false)
Shutdown.
Definition: ipctunnel.h:186
Definition: circularBuffer.h:20
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications.
Definition: ipctunnel.h:63
MsgType readMessage(size_t buffer)
Blocks until a command is available.
Definition: ipctunnel.h:213
Definition: circularBuffer.h:23
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available.
Definition: ipctunnel.h:210
ShareDataType * sharedData
Pointer to the Shared Data Region.
Definition: ipctunnel.h:248
virtual ~IPCTunnel()
Destructor.
Definition: ipctunnel.h:181
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage.
Definition: ipctunnel.h:216
Tunneling class between two processes, connected by shared memory.
Definition: ipctunnel.h:43
ShareDataType * getSharedData()
return a pointer to the ShareDataType region
Definition: ipctunnel.h:207
IPCTunnel(const std::string ®ion_name)
Access an existing Tunnel.
Definition: ipctunnel.h:134
void clearBuffer(size_t core)
Empty the messages in the buffer.
Definition: ipctunnel.h:219