12 #ifndef SST_CORE_INTERPROCESS_TUNNEL_H 
   13 #define SST_CORE_INTERPROCESS_TUNNEL_H 1 
   26 #include <sst/core/interprocess/circularBuffer.h> 
   30 namespace Interprocess {
 
   33 extern uint32_t globalIPCTunnelCount;
 
   42 template<
typename ShareDataType, 
typename MsgType>
 
   47     struct InternalSharedData {
 
   48         volatile uint32_t expectedChildren;
 
   62     IPCTunnel(uint32_t comp_id, 
size_t numBuffers, 
size_t bufferSize, uint32_t expectedChildren = 1) : master(true), shmPtr(NULL), fd(-1)
 
   65         memset(key, 
'\0', 
sizeof(key));
 
   67             snprintf(key, 
sizeof(key), 
"/sst_shmem_%u-%" PRIu32 
"-%d", getpid(), comp_id, rand());
 
   70             fd = shm_open(filename.c_str(), O_RDWR|O_CREAT|O_EXCL, S_IRUSR|S_IWUSR);
 
   78         } 
while ( (fd < 0) && (errno == EEXIST) );
 
   81             fprintf(stderr, 
"Failed to create IPC region '%s': %s\n", filename.c_str(), strerror(errno));
 
   86         shmSize = calculateShmemSize(numBuffers, bufferSize);
 
   87         if ( ftruncate(fd, shmSize) ) {
 
   89             fprintf(stderr, 
"Resizing shared file '%s' failed: %s\n", filename.c_str(), strerror(errno));
 
   93         shmPtr = mmap(NULL, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
 
   94         if ( shmPtr == MAP_FAILED ) {
 
   96             fprintf(stderr, 
"mmap failed: %s\n", strerror(errno));
 
   99         nextAllocPtr = (uint8_t*)shmPtr;
 
  100         memset(shmPtr, 
'\0', shmSize);
 
  103         auto resResult = reserveSpace<InternalSharedData>((1+numBuffers)*
sizeof(
size_t));
 
  104         isd = resResult.second;
 
  105         isd->expectedChildren = expectedChildren;
 
  106         isd->shmSegSize = shmSize;
 
  107         isd->numBuffers = numBuffers;
 
  110         auto shareResult = reserveSpace<ShareDataType>(0);
 
  111         isd->offsets[0] = shareResult.first;
 
  115         const size_t cbSize = 
sizeof(MsgType) * bufferSize;
 
  116         for ( 
size_t c = 0 ; c < isd->numBuffers ; c++ ) {
 
  119             auto resResult = reserveSpace<CircBuff_t>(cbSize);
 
  120             isd->offsets[1+c] = resResult.first;
 
  121             cPtr = resResult.second;
 
  122             cPtr->setBufferSize(bufferSize);
 
  123             circBuffs.push_back(cPtr);
 
  132     IPCTunnel(
const std::string ®ion_name) : master(false), shmPtr(NULL), fd(-1)
 
  134         fd = shm_open(region_name.c_str(), O_RDWR, S_IRUSR|S_IWUSR);
 
  135         filename = region_name;
 
  139             fprintf(stderr, 
"Failed to open IPC region '%s': %s\n",
 
  140                     filename.c_str(), strerror(errno));
 
  144         shmPtr = mmap(NULL, 
sizeof(InternalSharedData), PROT_READ, MAP_SHARED, fd, 0);
 
  145         if ( shmPtr == MAP_FAILED ) {
 
  147             fprintf(stderr, 
"mmap 0 failed: %s\n", strerror(errno));
 
  151         isd = (InternalSharedData*)shmPtr;
 
  152         shmSize = isd->shmSegSize;
 
  153         munmap(shmPtr, 
sizeof(InternalSharedData));
 
  155         shmPtr = mmap(NULL, shmSize, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
 
  156         if ( shmPtr == MAP_FAILED ) {
 
  158             fprintf(stderr, 
"mmap 1 failed: %s\n", strerror(errno));
 
  161         isd = (InternalSharedData*)shmPtr;
 
  162         sharedData = (ShareDataType*)((uint8_t*)shmPtr + isd->offsets[0]);
 
  164         for ( 
size_t c = 0 ; c < isd->numBuffers ; c++ ) {
 
  165             circBuffs.push_back((
CircBuff_t*)((uint8_t*)shmPtr + isd->offsets[c+1]));
 
  169         if ( --isd->expectedChildren == 0 ) {
 
  170             shm_unlink(filename.c_str());
 
  194             munmap(shmPtr, shmSize);
 
  204     const std::string& getRegionName(
void)
 const { 
return filename; }
 
  211         circBuffs[core]->write(command);
 
  216         return circBuffs[buffer]->read();
 
  221         return circBuffs[buffer]->readNB(result);
 
  226        circBuffs[core]->clearBuffer();
 
  231     template <
typename T>
 
  232     std::pair<size_t, T*> reserveSpace(
size_t extraSpace = 0)
 
  234         size_t space = 
sizeof(T) + extraSpace;
 
  235         if ( ((nextAllocPtr + space) - (uint8_t*)shmPtr) > shmSize )
 
  236             return std::make_pair<size_t, T*>(0, NULL);
 
  237         T* ptr = (T*)nextAllocPtr;
 
  238         nextAllocPtr += space;
 
  240         return std::make_pair((uint8_t*)ptr - (uint8_t*)shmPtr, ptr);
 
  243     size_t static calculateShmemSize(
size_t numBuffers, 
size_t bufferSize)
 
  245         long page_size = sysconf(_SC_PAGESIZE);
 
  248         size_t isd = 1 + ((
sizeof(InternalSharedData) + (1+numBuffers)*
sizeof(size_t)) / page_size);
 
  249         size_t buffer = 1+ ((
sizeof(CircBuff_t) +
 
  250                 bufferSize*
sizeof(MsgType)) / page_size);
 
  251         size_t shdata = 1+ ((
sizeof(ShareDataType) + 
sizeof(InternalSharedData)) / page_size);
 
  254         return (2 + isd + shdata + numBuffers*buffer) * page_size;
 
  264     std::string filename;
 
  266     uint8_t *nextAllocPtr;
 
  268     InternalSharedData *isd;
 
  269     std::vector<CircBuff_t* > circBuffs;
 
void shutdown(bool all=false)
Shutdown. 
Definition: ipctunnel.h:186
IPCTunnel(uint32_t comp_id, size_t numBuffers, size_t bufferSize, uint32_t expectedChildren=1)
Construct a new Tunnel for IPC Communications. 
Definition: ipctunnel.h:62
MsgType readMessage(size_t buffer)
Blocks until a command is available. 
Definition: ipctunnel.h:215
Definition: circularBuffer.h:22
void writeMessage(size_t core, const MsgType &command)
Blocks until space is available. 
Definition: ipctunnel.h:210
ShareDataType * sharedData
Pointer to the Shared Data Region. 
Definition: ipctunnel.h:259
virtual ~IPCTunnel()
Destructor. 
Definition: ipctunnel.h:178
bool readMessageNB(size_t buffer, MsgType *result)
Non-blocking version of readMessage. 
Definition: ipctunnel.h:220
Tunneling class between two processes, connected by shared memory. 
Definition: ipctunnel.h:43
ShareDataType * getSharedData()
return a pointer to the ShareDataType region 
Definition: ipctunnel.h:207
IPCTunnel(const std::string ®ion_name)
Access an existing Tunnel. 
Definition: ipctunnel.h:132
void clearBuffer(size_t core)
Empty the messages in the buffer. 
Definition: ipctunnel.h:225