13 #ifndef SST_CORE_OBJECTCOMMS_H
14 #define SST_CORE_OBJECTCOMMS_H
16 #include "sst/core/warnmacros.h"
17 #ifdef SST_CONFIG_HAVE_MPI
18 DISABLE_WARN_MISSING_OVERRIDE
23 #include "sst/core/serialization/serializer.h"
32 template <
typename dataType>
33 std::vector<char> serialize(dataType &data)
40 int size = ser.size();
42 std::vector<char> buffer;
45 ser.start_packing(buffer.data(),size);
76 template <
typename dataType>
77 dataType* deserialize(std::vector<char> &buffer)
79 dataType *tgt =
nullptr;
83 ser.start_unpacking(buffer.data(), buffer.size());
89 template <
typename dataType>
90 void deserialize(std::vector<char> &buffer, dataType &tgt)
94 ser.start_unpacking(buffer.data(), buffer.size());
98 template <
typename dataType>
99 void deserialize(
char *buffer,
int blen, dataType &tgt)
103 ser.start_unpacking(buffer, blen);
107 #ifdef SST_CONFIG_HAVE_MPI
108 template <
typename dataType>
109 void broadcast(dataType& data,
int root) {
111 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
112 if ( root == rank ) {
114 std::vector<char> buffer = Comms::serialize(data);
117 int size = buffer.size();
118 MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
121 MPI_Bcast(buffer.data(), buffer.size(), MPI_BYTE, root, MPI_COMM_WORLD);
126 MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
129 char* buffer =
new char[size];
130 MPI_Bcast(buffer, size, MPI_BYTE, root, MPI_COMM_WORLD);
133 Comms::deserialize(buffer, size, data);
137 template <
typename dataType>
138 void send(
int dest,
int tag, dataType& data) {
140 std::vector<char> buffer = Comms::serialize<dataType>(data);
144 int64_t size = buffer.size();
145 MPI_Send(&size, 1, MPI_INT64_T, dest, tag, MPI_COMM_WORLD);
147 int32_t fragment_size = 1000000000;
150 while ( size >= fragment_size ) {
151 MPI_Send(buffer.data() + offset, fragment_size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
152 size -= fragment_size;
153 offset += fragment_size;
155 MPI_Send(buffer.data() + offset, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
158 template <
typename dataType>
159 void recv(
int src,
int tag, dataType& data) {
163 MPI_Recv(&size, 1, MPI_INT64_T, src, tag, MPI_COMM_WORLD, &status);
166 char* buffer =
new char[size];
168 int32_t fragment_size = 1000000000;
169 int64_t rem_size = size;
171 while ( rem_size >= fragment_size ) {
172 MPI_Recv(buffer + offset, fragment_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
173 rem_size -= fragment_size;
174 offset += fragment_size;
176 MPI_Recv(buffer + offset, rem_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
180 Comms::deserialize(buffer, size, data);
184 template <
typename dataType>
185 void all_gather(dataType& data, std::vector<dataType> &out_data) {
186 int rank = 0, world = 0;
187 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
188 MPI_Comm_size(MPI_COMM_WORLD, &world);
191 std::vector<char> buffer = Comms::serialize(data);
194 size_t sendSize = buffer.size();
198 memset(allSizes,
'\0', world *
sizeof(
int));
199 memset(displ,
'\0', world *
sizeof(
int));
201 MPI_Allgather(&sendSize,
sizeof(
int), MPI_BYTE,
202 &allSizes,
sizeof(
int), MPI_BYTE, MPI_COMM_WORLD);
205 for (
int i = 0 ; i < world ; i++ ) {
206 totalBuf += allSizes[i];
208 displ[i] = displ[i-1] + allSizes[i-1];
211 char *bigBuff =
new char[totalBuf];
213 MPI_Allgatherv(buffer.data(), buffer.size(), MPI_BYTE,
214 bigBuff, allSizes, displ, MPI_BYTE, MPI_COMM_WORLD);
216 out_data.resize(world);
217 for (
int i = 0 ; i < world ; i++ ) {
218 Comms::deserialize(&bigBuff[displ[i]], allSizes[i], out_data[i]);
234 #endif // SST_CORE_OBJECTCOMMS_H
This class is basically a wrapper for objects to declare the order in which their members should be s...
Definition: serializer.h:35