SST  11.1.0
StructuralSimulationToolkit
threadsafe.h
1 // Copyright 2009-2021 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2021, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_THREADSAFE_H
13 #define SST_CORE_THREADSAFE_H
14 
15 #if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) )
16 #include <x86intrin.h>
17 #define sst_pause() _mm_pause()
18 #elif (defined(__arm__) || defined(__arm) || defined(__aarch64__))
19 #define sst_pause() __asm__ __volatile__("yield")
20 #elif defined(__PPC64__)
21 #define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory");
22 #endif
23 
24 #include <atomic>
25 #include <condition_variable>
26 #include <mutex>
27 #include <thread>
28 #include <vector>
29 //#include <stdalign.h>
30 
31 #include "sst/core/profile.h"
32 
33 #include <time.h>
34 
35 namespace SST {
36 namespace Core {
37 namespace ThreadSafe {
38 
39 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8))
40 #define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
41 #define CACHE_ALIGNED_T
42 #else
43 #define CACHE_ALIGNED(type, name) alignas(64) type name
44 #define CACHE_ALIGNED_T alignas(64)
45 #endif
46 
47 class CACHE_ALIGNED_T Barrier
48 {
49  size_t origCount;
50  std::atomic<bool> enabled;
51  std::atomic<size_t> count, generation;
52 
53 public:
54  Barrier(size_t count) : origCount(count), enabled(true), count(count), generation(0) {}
55 
56  // Come g++ 4.7, this can become a delegating constructor
57  Barrier() : origCount(0), enabled(false), count(0), generation(0) {}
58 
59  /** ONLY call this while nobody is in wait() */
60  void resize(size_t newCount)
61  {
62  count = origCount = newCount;
63  generation.store(0);
64  enabled.store(true);
65  }
66 
67  /**
68  * Wait for all threads to reach this point.
69  * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
70  */
71  double wait()
72  {
73  double elapsed = 0.0;
74  if ( enabled ) {
75  auto startTime = SST::Core::Profile::now();
76 
77  size_t gen = generation.load(std::memory_order_acquire);
78  asm("" ::: "memory");
79  size_t c = count.fetch_sub(1) - 1;
80  if ( 0 == c ) {
81  /* We should release */
82  count.store(origCount);
83  asm("" ::: "memory");
84  /* Incrementing generation causes release */
85  generation.fetch_add(1, std::memory_order_release);
86  __sync_synchronize();
87  }
88  else {
89  /* Try spinning first */
90  uint32_t count = 0;
91  do {
92  count++;
93  if ( count < 1024 ) { sst_pause(); }
94  else if ( count < (1024 * 1024) ) {
95  std::this_thread::yield();
96  }
97  else {
98  struct timespec ts;
99  ts.tv_sec = 0;
100  ts.tv_nsec = 1000;
101  nanosleep(&ts, nullptr);
102  }
103  } while ( gen == generation.load(std::memory_order_acquire) );
104  }
105  elapsed = SST::Core::Profile::getElapsed(startTime);
106  }
107  return elapsed;
108  }
109 
110  void disable()
111  {
112  enabled.store(false);
113  count.store(0);
114  ++generation;
115  }
116 };
117 
118 #if 0
119 typedef std::mutex Spinlock;
120 #else
121 class Spinlock
122 {
123  std::atomic_flag latch = ATOMIC_FLAG_INIT;
124  std::thread::id thread_id;
125 
126 public:
127  Spinlock() {}
128 
129  inline void lock()
130  {
131  if ( thread_id == std::this_thread::get_id() ) printf("DEADLOCK\n");
132  while ( latch.test_and_set(std::memory_order_acquire) ) {
133  sst_pause();
134 #if defined(__PPC64__)
135  __sync_synchronize();
136 #endif
137  }
138  thread_id = std::this_thread::get_id();
139  // std::cout << thread_id << std::endl;
140  }
141 
142  inline void unlock()
143  {
144  thread_id = std::thread::id();
145  latch.clear(std::memory_order_release);
146  }
147 };
148 #endif
149 
151 {
152 public:
153  EmptySpinlock() {}
154  inline void lock() {}
155 
156  inline void unlock() {}
157 };
158 
159 template <typename T>
161 {
162 
163  struct cell_t
164  {
165  std::atomic<size_t> sequence;
166  T data;
167  };
168 
169  bool initialized;
170  size_t dsize;
171  cell_t* data;
172  CACHE_ALIGNED(std::atomic<size_t>, rPtr);
173  CACHE_ALIGNED(std::atomic<size_t>, wPtr);
174 
175 public:
176  // BoundedQueue(size_t maxSize) : dsize(maxSize)
177  BoundedQueue(size_t maxSize) : initialized(false) { initialize(maxSize); }
178 
179  BoundedQueue() : initialized(false) {}
180 
181  void initialize(size_t maxSize)
182  {
183  if ( initialized ) return;
184  dsize = maxSize;
185  data = new cell_t[dsize];
186  for ( size_t i = 0; i < maxSize; i++ )
187  data[i].sequence.store(i);
188  rPtr.store(0);
189  wPtr.store(0);
190  // fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
191  initialized = true;
192  }
193 
194  ~BoundedQueue()
195  {
196  if ( initialized ) delete[] data;
197  }
198 
199  size_t size() const { return (wPtr.load() - rPtr.load()); }
200 
201  bool empty() const { return (rPtr.load() == wPtr.load()); }
202 
203  bool try_insert(const T& arg)
204  {
205  cell_t* cell = nullptr;
206  size_t pos = wPtr.load(std::memory_order_relaxed);
207  for ( ;; ) {
208  cell = &data[pos % dsize];
209  size_t seq = cell->sequence.load(std::memory_order_acquire);
210  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
211  if ( 0 == diff ) {
212  if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
213  }
214  else if ( 0 > diff ) {
215  // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
216  return false;
217  }
218  else {
219  pos = wPtr.load(std::memory_order_relaxed);
220  }
221  }
222  cell->data = arg;
223  cell->sequence.store(pos + 1, std::memory_order_release);
224  return true;
225  }
226 
227  bool try_remove(T& res)
228  {
229  cell_t* cell = nullptr;
230  size_t pos = rPtr.load(std::memory_order_relaxed);
231  for ( ;; ) {
232  cell = &data[pos % dsize];
233  size_t seq = cell->sequence.load(std::memory_order_acquire);
234  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
235  if ( 0 == diff ) {
236  if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
237  }
238  else if ( 0 > diff ) {
239  return false;
240  }
241  else {
242  pos = rPtr.load(std::memory_order_relaxed);
243  }
244  }
245  res = cell->data;
246  cell->sequence.store(pos + dsize, std::memory_order_release);
247  return true;
248  }
249 
250  T remove()
251  {
252  while ( 1 ) {
253  T res;
254  if ( try_remove(res) ) { return res; }
255  sst_pause();
256  }
257  }
258 };
259 
260 template <typename T>
262 {
263  struct CACHE_ALIGNED_T Node
264  {
265  std::atomic<Node*> next;
266  T data;
267 
268  Node() : next(nullptr) {}
269  };
270 
271  CACHE_ALIGNED(Node*, first);
272  CACHE_ALIGNED(Node*, last);
273  CACHE_ALIGNED(Spinlock, consumerLock);
274  CACHE_ALIGNED(Spinlock, producerLock);
275 
276 public:
278  {
279  /* 'first' is a dummy value */
280  first = last = new Node();
281  }
282 
283  ~UnboundedQueue()
284  {
285  while ( first != nullptr ) { // release the list
286  Node* tmp = first;
287  first = tmp->next;
288  delete tmp;
289  }
290  }
291 
292  void insert(const T& t)
293  {
294  Node* tmp = new Node();
295  tmp->data = t;
296  std::lock_guard<Spinlock> lock(producerLock);
297  last->next = tmp; // publish to consumers
298  last = tmp; // swing last forward
299  }
300 
301  bool try_remove(T& result)
302  {
303  std::lock_guard<Spinlock> lock(consumerLock);
304  Node* theFirst = first;
305  Node* theNext = first->next;
306  if ( theNext != nullptr ) { // if queue is nonempty
307  result = theNext->data; // take it out
308  first = theNext; // swing first forward
309  delete theFirst; // delete the old dummy
310  return true;
311  }
312  return false;
313  }
314 
315  T remove()
316  {
317  while ( 1 ) {
318  T res;
319  if ( try_remove(res) ) { return res; }
320  sst_pause();
321  }
322  }
323 };
324 
325 } // namespace ThreadSafe
326 } // namespace Core
327 } // namespace SST
328 
329 #endif // SST_CORE_THREADSAFE_H
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:71
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:60
Definition: threadsafe.h:121
Definition: threadsafe.h:261
Definition: threadsafe.h:47
Definition: threadsafe.h:150
Definition: threadsafe.h:160