SST  12.0.0
StructuralSimulationToolkit
threadsafe.h
1 // Copyright 2009-2022 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2022, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_THREADSAFE_H
13 #define SST_CORE_THREADSAFE_H
14 
15 #if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) )
16 #include <x86intrin.h>
17 #define sst_pause() _mm_pause()
18 #elif (defined(__arm__) || defined(__arm) || defined(__aarch64__))
19 #define sst_pause() __asm__ __volatile__("yield")
20 #elif defined(__PPC64__)
21 #define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory");
22 #endif
23 
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 //#include <stdalign.h>
30 
31 #include "sst/core/profile.h"
32 
33 #include <time.h>
34 
35 namespace SST {
36 namespace Core {
37 namespace ThreadSafe {
38 
39 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8))
40 #define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
41 #define CACHE_ALIGNED_T
42 #else
43 #define CACHE_ALIGNED(type, name) alignas(64) type name
44 #define CACHE_ALIGNED_T alignas(64)
45 #endif
46 
47 class CACHE_ALIGNED_T Barrier
48 {
49  size_t origCount;
50  std::atomic<bool> enabled;
51  std::atomic<size_t> count, generation;
52 
53 public:
54  Barrier(size_t count) : origCount(count), enabled(true), count(count), generation(0) {}
55 
56  // Come g++ 4.7, this can become a delegating constructor
57  Barrier() : origCount(0), enabled(false), count(0), generation(0) {}
58 
59  /** ONLY call this while nobody is in wait() */
60  void resize(size_t newCount)
61  {
62  count = origCount = newCount;
63  generation.store(0);
64  enabled.store(true);
65  }
66 
67  /**
68  * Wait for all threads to reach this point.
69  * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
70  */
71  double wait()
72  {
73  double elapsed = 0.0;
74  if ( enabled ) {
75  auto startTime = SST::Core::Profile::now();
76 
77  size_t gen = generation.load(std::memory_order_acquire);
78  asm("" ::: "memory");
79  size_t c = count.fetch_sub(1) - 1;
80  if ( 0 == c ) {
81  /* We should release */
82  count.store(origCount);
83  asm("" ::: "memory");
84  /* Incrementing generation causes release */
85  generation.fetch_add(1, std::memory_order_release);
86  __sync_synchronize();
87  }
88  else {
89  /* Try spinning first */
90  uint32_t count = 0;
91  do {
92  count++;
93  if ( count < 1024 ) { sst_pause(); }
94  else if ( count < (1024 * 1024) ) {
95  std::this_thread::yield();
96  }
97  else {
98  struct timespec ts;
99  ts.tv_sec = 0;
100  ts.tv_nsec = 1000;
101  nanosleep(&ts, nullptr);
102  }
103  } while ( gen == generation.load(std::memory_order_acquire) );
104  }
105  elapsed = SST::Core::Profile::getElapsed(startTime);
106  }
107  return elapsed;
108  }
109 
110  void disable()
111  {
112  enabled.store(false);
113  count.store(0);
114  ++generation;
115  }
116 };
117 
118 #if 0
119 typedef std::mutex Spinlock;
120 #else
121 class Spinlock
122 {
123  std::atomic_flag latch = ATOMIC_FLAG_INIT;
124 
125 public:
126  Spinlock() {}
127 
128  inline void lock()
129  {
130  while ( latch.test_and_set(std::memory_order_acquire) ) {
131  sst_pause();
132 #if defined(__PPC64__)
133  __sync_synchronize();
134 #endif
135  }
136  }
137 
138  inline void unlock() { latch.clear(std::memory_order_release); }
139 };
140 #endif
141 
143 {
144 public:
145  EmptySpinlock() {}
146  inline void lock() {}
147 
148  inline void unlock() {}
149 };
150 
151 template <typename T>
153 {
154 
155  struct cell_t
156  {
157  std::atomic<size_t> sequence;
158  T data;
159  };
160 
161  bool initialized;
162  size_t dsize;
163  cell_t* data;
164  CACHE_ALIGNED(std::atomic<size_t>, rPtr);
165  CACHE_ALIGNED(std::atomic<size_t>, wPtr);
166 
167 public:
168  // BoundedQueue(size_t maxSize) : dsize(maxSize)
169  BoundedQueue(size_t maxSize) : initialized(false) { initialize(maxSize); }
170 
171  BoundedQueue() : initialized(false) {}
172 
173  void initialize(size_t maxSize)
174  {
175  if ( initialized ) return;
176  dsize = maxSize;
177  data = new cell_t[dsize];
178  for ( size_t i = 0; i < maxSize; i++ )
179  data[i].sequence.store(i);
180  rPtr.store(0);
181  wPtr.store(0);
182  // fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
183  initialized = true;
184  }
185 
186  ~BoundedQueue()
187  {
188  if ( initialized ) delete[] data;
189  }
190 
191  size_t size() const { return (wPtr.load() - rPtr.load()); }
192 
193  bool empty() const { return (rPtr.load() == wPtr.load()); }
194 
195  bool try_insert(const T& arg)
196  {
197  cell_t* cell = nullptr;
198  size_t pos = wPtr.load(std::memory_order_relaxed);
199  for ( ;; ) {
200  cell = &data[pos % dsize];
201  size_t seq = cell->sequence.load(std::memory_order_acquire);
202  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
203  if ( 0 == diff ) {
204  if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
205  }
206  else if ( 0 > diff ) {
207  // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
208  return false;
209  }
210  else {
211  pos = wPtr.load(std::memory_order_relaxed);
212  }
213  }
214  cell->data = arg;
215  cell->sequence.store(pos + 1, std::memory_order_release);
216  return true;
217  }
218 
219  bool try_remove(T& res)
220  {
221  cell_t* cell = nullptr;
222  size_t pos = rPtr.load(std::memory_order_relaxed);
223  for ( ;; ) {
224  cell = &data[pos % dsize];
225  size_t seq = cell->sequence.load(std::memory_order_acquire);
226  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
227  if ( 0 == diff ) {
228  if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
229  }
230  else if ( 0 > diff ) {
231  return false;
232  }
233  else {
234  pos = rPtr.load(std::memory_order_relaxed);
235  }
236  }
237  res = cell->data;
238  cell->sequence.store(pos + dsize, std::memory_order_release);
239  return true;
240  }
241 
242  T remove()
243  {
244  while ( 1 ) {
245  T res;
246  if ( try_remove(res) ) { return res; }
247  sst_pause();
248  }
249  }
250 };
251 
252 template <typename T>
254 {
255  struct CACHE_ALIGNED_T Node
256  {
257  std::atomic<Node*> next;
258  T data;
259 
260  Node() : next(nullptr) {}
261  };
262 
263  CACHE_ALIGNED(Node*, first);
264  CACHE_ALIGNED(Node*, last);
265  CACHE_ALIGNED(Spinlock, consumerLock);
266  CACHE_ALIGNED(Spinlock, producerLock);
267 
268 public:
270  {
271  /* 'first' is a dummy value */
272  first = last = new Node();
273  }
274 
275  ~UnboundedQueue()
276  {
277  while ( first != nullptr ) { // release the list
278  Node* tmp = first;
279  first = tmp->next;
280  delete tmp;
281  }
282  }
283 
284  void insert(const T& t)
285  {
286  Node* tmp = new Node();
287  tmp->data = t;
288  std::lock_guard<Spinlock> lock(producerLock);
289  last->next = tmp; // publish to consumers
290  last = tmp; // swing last forward
291  }
292 
293  bool try_remove(T& result)
294  {
295  std::lock_guard<Spinlock> lock(consumerLock);
296  Node* theFirst = first;
297  Node* theNext = first->next;
298  if ( theNext != nullptr ) { // if queue is nonempty
299  result = theNext->data; // take it out
300  first = theNext; // swing first forward
301  delete theFirst; // delete the old dummy
302  return true;
303  }
304  return false;
305  }
306 
307  T remove()
308  {
309  while ( 1 ) {
310  T res;
311  if ( try_remove(res) ) { return res; }
312  sst_pause();
313  }
314  }
315 };
316 
317 } // namespace ThreadSafe
318 } // namespace Core
319 } // namespace SST
320 
321 #endif // SST_CORE_THREADSAFE_H
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:71
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:60
Definition: threadsafe.h:121
Definition: threadsafe.h:253
Definition: threadsafe.h:47
Definition: threadsafe.h:142
Definition: threadsafe.h:152