00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #ifndef SST_CORE_INTERPROCESS_TUNNEL_H
00013 #define SST_CORE_INTERPROCESS_TUNNEL_H 1
00014
00015
00016 #include <cstdio>
00017 #include <vector>
00018 #include <string>
00019 #include <unistd.h>
00020
00021 #include <sst/core/interprocess/circularBuffer.h>
00022
00023 #include <boost/interprocess/managed_shared_memory.hpp>
00024 #include <boost/interprocess/managed_xsi_shared_memory.hpp>
00025
00026
00027 namespace SST {
00028 namespace Core {
00029 namespace Interprocess {
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 template<typename ShareDataType, typename MsgType>
00040 class IPCTunnel {
00041
00042 typedef SST::Core::Interprocess::CircularBuffer<
00043 MsgType,
00044 boost::interprocess::allocator<MsgType,
00045 boost::interprocess::managed_xsi_shared_memory::segment_manager> >
00046 CircBuff_t;
00047
00048 struct InternalSharedData {
00049 size_t numBuffers;
00050 };
00051
00052 bool remove_old_shared_memory() {
00053 bool removed = false;
00054 try {
00055 boost::interprocess::xsi_shared_memory xsi(boost::interprocess::open_only, xkey);
00056 boost::interprocess::xsi_shared_memory::remove(xsi.get_shmid());
00057 removed = true;
00058 } catch (boost::interprocess::interprocess_exception & e) {
00059 if ( e.get_error_code() != boost::interprocess::not_found_error )
00060 throw ;
00061 }
00062 return removed;
00063 }
00064
00065 boost::interprocess::xsi_key get_xsi_key(const std::string &name) {
00066 xkey = boost::interprocess::xsi_key(name.c_str(), 1);
00067 return xkey;
00068 }
00069
00070 public:
00071
00072
00073
00074
00075
00076
00077 IPCTunnel(const std::string ®ion_name, size_t numBuffers,
00078 size_t bufferSize)
00079 {
00080 get_xsi_key(region_name);
00081
00082 remove_old_shared_memory();
00083
00084 shm = boost::interprocess::managed_xsi_shared_memory(
00085 boost::interprocess::create_only, xkey,
00086 calculateShmemSize(numBuffers, bufferSize));
00087
00088
00089 isd = shm.construct<InternalSharedData>("InternalShared")();
00090 isd->numBuffers = numBuffers;
00091
00092
00093 sharedData = shm.construct<ShareDataType>("Shared Data")();
00094
00095
00096 char bufName[1024];
00097 for ( size_t c = 0 ; c < numBuffers ; c++ ) {
00098 sprintf(bufName, "buffer%zu", c);
00099 circBuffs.push_back(shm.construct<CircBuff_t>(bufName)(
00100 bufferSize, shm.get_segment_manager()));
00101 }
00102
00103 }
00104
00105
00106
00107
00108
00109 IPCTunnel(const std::string ®ion_name)
00110 {
00111 get_xsi_key(region_name);
00112 shm = boost::interprocess::managed_xsi_shared_memory(
00113 boost::interprocess::open_only, xkey);
00114 isd = shm.find<InternalSharedData>("InternalShared").first;
00115 sharedData = shm.find<ShareDataType>("Shared Data").first;
00116
00117 char bufName[1024];
00118 for ( size_t c = 0 ; c < isd->numBuffers ; c++ ) {
00119 sprintf(bufName, "buffer%zu", c);
00120 circBuffs.push_back(shm.find<CircBuff_t>(bufName).first);
00121 }
00122 }
00123
00124
00125
00126
00127
00128 virtual ~IPCTunnel()
00129 {
00130 remove_old_shared_memory();
00131 }
00132
00133
00134
00135
00136 void shutdown(bool all = false)
00137 {
00138 if ( all ) {
00139 while (remove_old_shared_memory() );
00140 } else {
00141 remove_old_shared_memory();
00142 }
00143
00144 }
00145
00146
00147 ShareDataType* getSharedData() { return sharedData; }
00148
00149
00150 void writeMessage(size_t core, const MsgType &command) {
00151 circBuffs[core]->write(command);
00152 }
00153
00154
00155 MsgType readMessage(size_t buffer) {
00156 return circBuffs[buffer]->read();
00157 }
00158
00159
00160 bool readMessageNB(size_t buffer, MsgType *result) {
00161 return circBuffs[buffer]->readNB(result);
00162 }
00163
00164
00165
00166 private:
00167 size_t calculateShmemSize(size_t numBuffers, size_t bufferSize) const
00168 {
00169 long page_size = sysconf(_SC_PAGESIZE);
00170
00171
00172 size_t buffer = 1+ ((sizeof(CircBuff_t) +
00173 bufferSize*sizeof(MsgType)) / page_size);
00174 size_t shdata = 1+ ((sizeof(ShareDataType) + sizeof(InternalSharedData)) / page_size);
00175
00176
00177 return (2 + shdata + numBuffers*buffer) * page_size;
00178
00179 }
00180
00181 protected:
00182
00183 ShareDataType *sharedData;
00184
00185 private:
00186 boost::interprocess::xsi_key xkey;
00187 boost::interprocess::managed_xsi_shared_memory shm;
00188 InternalSharedData *isd;
00189 std::vector<CircBuff_t* > circBuffs;
00190
00191 };
00192
00193 }
00194 }
00195 }
00196
00197
00198
00199 #endif