00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #ifndef SST_CORE_OBJECTCOMMS_H
00014 #define SST_CORE_OBJECTCOMMS_H
00015
00016 #ifdef SST_CONFIG_HAVE_MPI
00017 #include <mpi.h>
00018 #endif
00019
00020 #include <boost/archive/polymorphic_binary_iarchive.hpp>
00021 #include <boost/archive/polymorphic_binary_oarchive.hpp>
00022 #include <boost/serialization/vector.hpp>
00023
00024 #include <boost/iostreams/stream_buffer.hpp>
00025 #include <boost/iostreams/stream.hpp>
00026 #include <boost/iostreams/device/back_inserter.hpp>
00027
00028 namespace SST {
00029
00030 namespace Comms {
00031
00032 template <typename dataType>
00033 std::vector<char> serialize(dataType *data)
00034 {
00035 std::vector<char> buffer;
00036
00037 boost::iostreams::back_insert_device<std::vector<char> > inserter(buffer);
00038 boost::iostreams::stream<boost::iostreams::back_insert_device<std::vector<char> > > output_stream(inserter);
00039 boost::archive::polymorphic_binary_oarchive oa(output_stream, boost::archive::no_header);
00040
00041 oa << data;
00042 output_stream.flush();
00043
00044 return buffer;
00045 }
00046
00047
00048 template <typename dataType>
00049 std::vector<char> serialize(dataType &data)
00050 {
00051 std::vector<char> buffer;
00052
00053 boost::iostreams::back_insert_device<std::vector<char> > inserter(buffer);
00054 boost::iostreams::stream<boost::iostreams::back_insert_device<std::vector<char> > > output_stream(inserter);
00055 boost::archive::polymorphic_binary_oarchive oa(output_stream, boost::archive::no_header);
00056
00057 oa << data;
00058 output_stream.flush();
00059
00060 return buffer;
00061 }
00062
00063
00064 template <typename dataType>
00065 dataType* deserialize(std::vector<char> &buffer)
00066 {
00067 dataType *tgt;
00068
00069 boost::iostreams::basic_array_source<char> source(buffer.data(), buffer.size());
00070 boost::iostreams::stream<boost::iostreams::basic_array_source <char> > input_stream(source);
00071 boost::archive::polymorphic_binary_iarchive ia(input_stream, boost::archive::no_header );
00072
00073 ia >> tgt;
00074
00075 return tgt;
00076 }
00077
00078 template <typename dataType>
00079 void deserialize(std::vector<char> &buffer, dataType &tgt)
00080 {
00081 boost::iostreams::basic_array_source<char> source(buffer.data(), buffer.size());
00082 boost::iostreams::stream<boost::iostreams::basic_array_source <char> > input_stream(source);
00083 boost::archive::polymorphic_binary_iarchive ia(input_stream, boost::archive::no_header );
00084
00085 ia >> tgt;
00086 }
00087
00088 template <typename dataType>
00089 void deserialize(char *buffer, int blen, dataType &tgt)
00090 {
00091 boost::iostreams::basic_array_source<char> source(buffer, blen);
00092 boost::iostreams::stream<boost::iostreams::basic_array_source <char> > input_stream(source);
00093 boost::archive::polymorphic_binary_iarchive ia(input_stream, boost::archive::no_header );
00094
00095 ia >> tgt;
00096 }
00097
00098
00099
00100 #ifdef SST_CONFIG_HAVE_MPI
00101 template <typename dataType>
00102 void broadcast(dataType& data, int root) {
00103 int rank = 0;
00104 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
00105 if ( root == rank ) {
00106
00107 std::vector<char> buffer = Comms::serialize(data);
00108
00109
00110 int size = buffer.size();
00111 MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
00112
00113
00114 MPI_Bcast(buffer.data(), buffer.size(), MPI_BYTE, root, MPI_COMM_WORLD);
00115 }
00116 else {
00117
00118 int size = 0;
00119 MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
00120
00121
00122 char* buffer = new char[size];
00123 MPI_Bcast(buffer, size, MPI_BYTE, root, MPI_COMM_WORLD);
00124
00125
00126 Comms::deserialize(buffer, size, data);
00127 }
00128 }
00129
00130 template <typename dataType>
00131 void send(int dest, int tag, dataType& data) {
00132
00133 std::vector<char> buffer = Comms::serialize(data);
00134
00135
00136
00137 int64_t size = buffer.size();
00138 MPI_Send(&size, 1, MPI_INT64_T, dest, tag, MPI_COMM_WORLD);
00139
00140 int32_t fragment_size = 1000000000;
00141 int64_t offset = 0;
00142
00143 while ( size >= fragment_size ) {
00144 MPI_Send(buffer.data() + offset, fragment_size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
00145 size -= fragment_size;
00146 offset += fragment_size;
00147 }
00148 MPI_Send(buffer.data() + offset, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
00149 }
00150
00151 template <typename dataType>
00152 void recv(int src, int tag, dataType& data) {
00153
00154 int64_t size = 0;
00155 MPI_Status status;
00156 MPI_Recv(&size, 1, MPI_INT64_T, src, tag, MPI_COMM_WORLD, &status);
00157
00158
00159 char* buffer = new char[size];
00160 int64_t offset = 0;
00161 int32_t fragment_size = 1000000000;
00162 int64_t rem_size = size;
00163
00164 while ( rem_size >= fragment_size ) {
00165 MPI_Recv(buffer + offset, fragment_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
00166 rem_size -= fragment_size;
00167 offset += fragment_size;
00168 }
00169 MPI_Recv(buffer + offset, rem_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
00170
00171
00172
00173 Comms::deserialize(buffer, size, data);
00174 }
00175
00176
00177 template <typename dataType>
00178 void all_gather(const dataType& data, std::vector<dataType> &out_data) {
00179 int rank = 0, world = 0;
00180 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
00181 MPI_Comm_size(MPI_COMM_WORLD, &world);
00182
00183
00184 std::vector<char> buffer = Comms::serialize(data);
00185
00186
00187 size_t sendSize = buffer.size();
00188 int allSizes[world];
00189 int displ[world];
00190
00191 memset(allSizes, '\0', world * sizeof(int));
00192 memset(displ, '\0', world * sizeof(int));
00193
00194 MPI_Allgather(&sendSize, sizeof(int), MPI_BYTE,
00195 &allSizes, sizeof(int), MPI_BYTE, MPI_COMM_WORLD);
00196
00197 int totalBuf = 0;
00198 for ( int i = 0 ; i < world ; i++ ) {
00199 totalBuf += allSizes[i];
00200 if ( i > 0 )
00201 displ[i] = displ[i-1] + allSizes[i-1];
00202 }
00203
00204 char *bigBuff = new char[totalBuf];
00205
00206 MPI_Allgatherv(buffer.data(), buffer.size(), MPI_BYTE,
00207 bigBuff, allSizes, displ, MPI_BYTE, MPI_COMM_WORLD);
00208
00209 out_data.resize(world);
00210 for ( int i = 0 ; i < world ; i++ ) {
00211 Comms::deserialize(&bigBuff[displ[i]], allSizes[i], out_data[i]);
00212 }
00213
00214 delete bigBuff;
00215
00216 }
00217
00218
00219
00220
00221 #endif
00222
00223 }
00224
00225 }
00226
00227 #endif // SST_CORE_OBJECTCOMMS_H