SST  11.1.0
StructuralSimulationToolkit
objectComms.h
1 // Copyright 2009-2021 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2021, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_OBJECTCOMMS_H
13 #define SST_CORE_OBJECTCOMMS_H
14 
15 #include "sst/core/serialization/serializer.h"
16 #include "sst/core/warnmacros.h"
17 
18 #ifdef SST_CONFIG_HAVE_MPI
19 DISABLE_WARN_MISSING_OVERRIDE
20 #include <mpi.h>
21 REENABLE_WARNING
22 #endif
23 
24 #include <memory>
25 #include <typeinfo>
26 
27 namespace SST {
28 
29 namespace Comms {
30 
31 template <typename dataType>
32 std::vector<char>
33 serialize(dataType& data)
34 {
36 
37  ser.start_sizing();
38  ser& data;
39 
40  size_t size = ser.size();
41 
42  std::vector<char> buffer;
43  buffer.resize(size);
44 
45  ser.start_packing(buffer.data(), size);
46  ser& data;
47 
48  return buffer;
49 }
50 
51 // template <typename dataType>
52 // std::vector<char> serialize(dataType* data)
53 // {
54 // SST::Core::Serialization::serializer ser;
55 
56 // std::cout << typeid(data).name() << std::endl;
57 // ser.start_sizing();
58 // ser & data;
59 
60 // ser.start_sizing();
61 // ser & data;
62 
63 // int size = ser.size();
64 // std::cout << "serialize size = " << size << std::endl;
65 
66 // std::vector<char> buffer;
67 // buffer.resize(size);
68 
69 // ser.start_packing(buffer.data(),size);
70 // ser & data;
71 
72 // return buffer;
73 // }
74 
75 template <typename dataType>
76 dataType*
77 deserialize(std::vector<char>& buffer)
78 {
79  dataType* tgt = nullptr;
80 
82 
83  ser.start_unpacking(buffer.data(), buffer.size());
84  ser& tgt;
85 
86  return tgt;
87 }
88 
89 template <typename dataType>
90 void
91 deserialize(std::vector<char>& buffer, dataType& tgt)
92 {
94 
95  ser.start_unpacking(buffer.data(), buffer.size());
96  ser& tgt;
97 }
98 
99 template <typename dataType>
100 void
101 deserialize(char* buffer, int blen, dataType& tgt)
102 {
104 
105  ser.start_unpacking(buffer, blen);
106  ser& tgt;
107 }
108 
109 #ifdef SST_CONFIG_HAVE_MPI
110 template <typename dataType>
111 void
112 broadcast(dataType& data, int root)
113 {
114  int rank = 0;
115  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
116  if ( root == rank ) {
117  // Serialize the data
118  std::vector<char> buffer = Comms::serialize(data);
119 
120  // Now broadcast the size of the data
121  int size = buffer.size();
122  MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
123 
124  // Now broadcast the data
125  MPI_Bcast(buffer.data(), buffer.size(), MPI_BYTE, root, MPI_COMM_WORLD);
126  }
127  else {
128  // Get the size of the broadcast
129  int size = 0;
130  MPI_Bcast(&size, 1, MPI_INT, root, MPI_COMM_WORLD);
131 
132  // Now get the data
133  auto buffer = std::unique_ptr<char[]>(new char[size]);
134  MPI_Bcast(buffer.get(), size, MPI_BYTE, root, MPI_COMM_WORLD);
135 
136  // Now deserialize data
137  Comms::deserialize(buffer.get(), size, data);
138  }
139 }
140 
141 template <typename dataType>
142 void
143 send(int dest, int tag, dataType& data)
144 {
145  // Serialize the data
146  std::vector<char> buffer = Comms::serialize<dataType>(data);
147 
148  // Now send the data. Send size first, then payload
149  // std::cout<< sizeof(buffer.size()) << std::endl;
150  int64_t size = buffer.size();
151  MPI_Send(&size, 1, MPI_INT64_T, dest, tag, MPI_COMM_WORLD);
152 
153  int32_t fragment_size = 1000000000;
154  int64_t offset = 0;
155 
156  while ( size >= fragment_size ) {
157  MPI_Send(buffer.data() + offset, fragment_size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
158  size -= fragment_size;
159  offset += fragment_size;
160  }
161  MPI_Send(buffer.data() + offset, size, MPI_BYTE, dest, tag, MPI_COMM_WORLD);
162 }
163 
164 template <typename dataType>
165 void
166 recv(int src, int tag, dataType& data)
167 {
168  // Get the size of the broadcast
169  int64_t size = 0;
170  MPI_Status status;
171  MPI_Recv(&size, 1, MPI_INT64_T, src, tag, MPI_COMM_WORLD, &status);
172 
173  // Now get the data
174  auto buffer = std::unique_ptr<char[]>(new char[size]);
175  int64_t offset = 0;
176  int32_t fragment_size = 1000000000;
177  int64_t rem_size = size;
178 
179  while ( rem_size >= fragment_size ) {
180  MPI_Recv(buffer.get() + offset, fragment_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
181  rem_size -= fragment_size;
182  offset += fragment_size;
183  }
184  MPI_Recv(buffer.get() + offset, rem_size, MPI_BYTE, src, tag, MPI_COMM_WORLD, &status);
185 
186  // Now deserialize data
187  Comms::deserialize(buffer.get(), size, data);
188 }
189 
190 template <typename dataType>
191 void
192 all_gather(dataType& data, std::vector<dataType>& out_data)
193 {
194  int rank = 0, world = 0;
195  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
196  MPI_Comm_size(MPI_COMM_WORLD, &world);
197 
198  // Serialize the data
199  std::vector<char> buffer = Comms::serialize(data);
200 
201  size_t sendSize = buffer.size();
202  int allSizes[world];
203  int displ[world];
204 
205  memset(allSizes, '\0', world * sizeof(int));
206  memset(displ, '\0', world * sizeof(int));
207 
208  MPI_Allgather(&sendSize, sizeof(int), MPI_BYTE, &allSizes, sizeof(int), MPI_BYTE, MPI_COMM_WORLD);
209 
210  int totalBuf = 0;
211  for ( int i = 0; i < world; i++ ) {
212  totalBuf += allSizes[i];
213  if ( i > 0 ) displ[i] = displ[i - 1] + allSizes[i - 1];
214  }
215 
216  auto bigBuff = std::unique_ptr<char[]>(new char[totalBuf]);
217 
218  MPI_Allgatherv(buffer.data(), buffer.size(), MPI_BYTE, bigBuff.get(), allSizes, displ, MPI_BYTE, MPI_COMM_WORLD);
219 
220  out_data.resize(world);
221  for ( int i = 0; i < world; i++ ) {
222  auto* bbuf = bigBuff.get();
223  Comms::deserialize(&bbuf[displ[i]], allSizes[i], out_data[i]);
224  }
225 }
226 
227 #endif
228 
229 } // namespace Comms
230 
231 } // namespace SST
232 
233 #endif // SST_CORE_OBJECTCOMMS_H
This class is basically a wrapper for objects to declare the order in which their members should be s...
Definition: serializer.h:34