SST  11.0.0
StructuralSimulationToolkit
objectComms.h
1 // Copyright 2009-2021 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2021, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 
13 #ifndef SST_CORE_OBJECTCOMMS_H
14 #define SST_CORE_OBJECTCOMMS_H
15 
16 #include "sst/core/warnmacros.h"
17 #ifdef SST_CONFIG_HAVE_MPI
18 DISABLE_WARN_MISSING_OVERRIDE
19 #include <mpi.h>
20 REENABLE_WARNING
21 #endif
22 
23 #include "sst/core/serialization/serializer.h"
24 
25 #include <typeinfo>
26 
27 namespace SST {
28 
29 namespace Comms {
30 
31 
32 template <typename dataType>
33 std::vector<char> serialize(dataType &data)
34 {
36 
37  ser.start_sizing();
38  ser & data;
39 
40  size_t size = ser.size();
41 
42  std::vector<char> buffer;
43  buffer.resize(size);
44 
45  ser.start_packing(buffer.data(),size);
46  ser & data;
47 
48  return buffer;
49 }
50 
51 // template <typename dataType>
52 // std::vector<char> serialize(dataType* data)
53 // {
54 // SST::Core::Serialization::serializer ser;
55 
56 // std::cout << typeid(data).name() << std::endl;
57 // ser.start_sizing();
58 // ser & data;
59 
60 // ser.start_sizing();
61 // ser & data;
62 
63 // int size = ser.size();
64 // std::cout << "serialize size = " << size << std::endl;
65 
66 // std::vector<char> buffer;
67 // buffer.resize(size);
68 
69 // ser.start_packing(buffer.data(),size);
70 // ser & data;
71 
72 // return buffer;
73 // }
74 
75 
76 template <typename dataType>
77 dataType* deserialize(std::vector<char> &buffer)
78 {
79  dataType *tgt = nullptr;
80 
82 
83  ser.start_unpacking(buffer.data(), buffer.size());
84  ser & tgt;
85 
86  return tgt;
87 }
88 
89 template <typename dataType>
90 void deserialize(std::vector<char> &buffer, dataType &tgt)
91 {
93 
94  ser.start_unpacking(buffer.data(), buffer.size());
95  ser & tgt;
96 }
97 
98 template <typename dataType>
99 void deserialize(char *buffer, int blen, dataType &tgt)
100 {
102 
103  ser.start_unpacking(buffer, blen);
104  ser & tgt;
105 }
106 
107 #ifdef SST_CONFIG_HAVE_MPI
108 template <typename dataType>
109 void broadcast(dataType& data, int root) {
110  int rank = 0;
111  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
112  if ( root == rank ) {
113  // Serialize the data
114  std::vector<char> buffer = Comms::serialize(data);
115 
116  // Now broadcast the size of the data
117  int size = buffer.size();
118  MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
119 
120  // Now broadcast the data
121  MPI_Bcast(buffer.data(), buffer.size(), MPI_BYTE, root, MPI_COMM_WORLD);
122  }
123  else {
124  // Get the size of the broadcast
125  int size = 0;
126  MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
127 
128  // Now get the data
129  char* buffer = new char[size];
130  MPI_Bcast(buffer, size, MPI_BYTE, root, MPI_COMM_WORLD);
131 
132  // Now deserialize data
133  Comms::deserialize(buffer, size, data);
134  }
135 }
136 
137 template <typename dataType>
138 void send(int dest, int tag, dataType& data) {
139  // Serialize the data
140  std::vector<char> buffer = Comms::serialize<dataType>(data);
141 
142  // Now send the data. Send size first, then payload
143  // std::cout<< sizeof(buffer.size()) << std::endl;
144  int64_t size = buffer.size();
145  MPI_Send(&size, 1, MPI_INT64_T, dest, tag, MPI_COMM_WORLD);
146 
147  int32_t fragment_size = 1000000000;
148  int64_t offset = 0;
149 
150  while ( size >= fragment_size ) {
151  MPI_Send(buffer.data() + offset, fragment_size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
152  size -= fragment_size;
153  offset += fragment_size;
154  }
155  MPI_Send(buffer.data() + offset, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
156 }
157 
158 template <typename dataType>
159 void recv(int src, int tag, dataType& data) {
160  // Get the size of the broadcast
161  int64_t size = 0;
162  MPI_Status status;
163  MPI_Recv(&size, 1, MPI_INT64_T, src, tag, MPI_COMM_WORLD, &status);
164 
165  // Now get the data
166  char* buffer = new char[size];
167  int64_t offset = 0;
168  int32_t fragment_size = 1000000000;
169  int64_t rem_size = size;
170 
171  while ( rem_size >= fragment_size ) {
172  MPI_Recv(buffer + offset, fragment_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
173  rem_size -= fragment_size;
174  offset += fragment_size;
175  }
176  MPI_Recv(buffer + offset, rem_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
177 
178 
179  // Now deserialize data
180  Comms::deserialize(buffer, size, data);
181 }
182 
183 
184 template <typename dataType>
185 void all_gather(dataType& data, std::vector<dataType> &out_data) {
186  int rank = 0, world = 0;
187  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
188  MPI_Comm_size(MPI_COMM_WORLD, &world);
189 
190  // Serialize the data
191  std::vector<char> buffer = Comms::serialize(data);
192 
193 
194  size_t sendSize = buffer.size();
195  int allSizes[world];
196  int displ[world];
197 
198  memset(allSizes, '\0', world * sizeof(int));
199  memset(displ, '\0', world * sizeof(int));
200 
201  MPI_Allgather(&sendSize, sizeof(int), MPI_BYTE,
202  &allSizes, sizeof(int), MPI_BYTE, MPI_COMM_WORLD);
203 
204  int totalBuf = 0;
205  for ( int i = 0 ; i < world ; i++ ) {
206  totalBuf += allSizes[i];
207  if ( i > 0 )
208  displ[i] = displ[i-1] + allSizes[i-1];
209  }
210 
211  char *bigBuff = new char[totalBuf];
212 
213  MPI_Allgatherv(buffer.data(), buffer.size(), MPI_BYTE,
214  bigBuff, allSizes, displ, MPI_BYTE, MPI_COMM_WORLD);
215 
216  out_data.resize(world);
217  for ( int i = 0 ; i < world ; i++ ) {
218  Comms::deserialize(&bigBuff[displ[i]], allSizes[i], out_data[i]);
219  }
220 
221  delete [] bigBuff;
222 
223 }
224 
225 
226 
227 
228 #endif
229 
230 }
231 
232 } //namespace SST
233 
234 #endif // SST_CORE_OBJECTCOMMS_H
This class is basically a wrapper for objects to declare the order in which their members should be s...
Definition: serializer.h:35