SST  11.0.0
StructuralSimulationToolkit
threadsafe.h
1 // Copyright 2009-2021 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2021, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
14 
15 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
16  defined( __x86_64 ) || defined( __x86_64__ ) )
17 #include <x86intrin.h>
18 #define sst_pause() _mm_pause()
19 #elif ( defined(__arm__) || defined(__arm) || defined(__aarch64__) )
20 #define sst_pause() __asm__ __volatile__ ("yield")
21 #elif defined(__PPC64__)
22 #define sst_pause() __asm__ __volatile__ ( "or 27, 27, 27" ::: "memory" );
23 #endif
24 
25 #include <thread>
26 #include <atomic>
27 #include <condition_variable>
28 #include <mutex>
29 
30 #include <vector>
31 //#include <stdalign.h>
32 
33 #include <time.h>
34 
35 #include "sst/core/profile.h"
36 
37 namespace SST {
38 namespace Core {
39 namespace ThreadSafe {
40 
41 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
42 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
43 # define CACHE_ALIGNED_T
44 #else
45 # define CACHE_ALIGNED(type, name) alignas(64) type name
46 # define CACHE_ALIGNED_T alignas(64)
47 #endif
48 
49 
50 class CACHE_ALIGNED_T Barrier {
51  size_t origCount;
52  std::atomic<bool> enabled;
53  std::atomic<size_t> count, generation;
54 public:
55  Barrier(size_t count) : origCount(count), enabled(true),
56  count(count), generation(0)
57  { }
58 
59  // Come g++ 4.7, this can become a delegating constructor
60  Barrier() : origCount(0), enabled(false), count(0), generation(0)
61  { }
62 
63 
64  /** ONLY call this while nobody is in wait() */
65  void resize(size_t newCount)
66  {
67  count = origCount = newCount;
68  generation.store(0);
69  enabled.store(true);
70  }
71 
72 
73  /**
74  * Wait for all threads to reach this point.
75  * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
76  */
77  double wait()
78  {
79  double elapsed = 0.0;
80  if ( enabled ) {
81  auto startTime = SST::Core::Profile::now();
82 
83  size_t gen = generation.load(std::memory_order_acquire);
84  asm("":::"memory");
85  size_t c = count.fetch_sub(1) -1;
86  if ( 0 == c ) {
87  /* We should release */
88  count.store(origCount);
89  asm("":::"memory");
90  /* Incrementing generation causes release */
91  generation.fetch_add(1, std::memory_order_release);
92  __sync_synchronize();
93  } else {
94  /* Try spinning first */
95  uint32_t count = 0;
96  do {
97  count++;
98  if ( count < 1024 ) {
99  sst_pause();
100  } else if ( count < (1024*1024) ) {
101  std::this_thread::yield();
102  } else {
103  struct timespec ts;
104  ts.tv_sec = 0;
105  ts.tv_nsec = 1000;
106  nanosleep(&ts, nullptr);
107  }
108  } while ( gen == generation.load(std::memory_order_acquire) );
109  }
110  elapsed = SST::Core::Profile::getElapsed(startTime);
111  }
112  return elapsed;
113  }
114 
115  void disable()
116  {
117  enabled.store(false);
118  count.store(0);
119  ++generation;
120  }
121 };
122 
123 
124 #if 0
125 typedef std::mutex Spinlock;
126 #else
127 class Spinlock {
128  std::atomic_flag latch = ATOMIC_FLAG_INIT;
129 public:
130  Spinlock() { }
131 
132  inline void lock() {
133  while ( latch.test_and_set(std::memory_order_acquire) ) {
134  sst_pause();
135 #if defined(__PPC64__)
136  __sync_synchronize();
137 #endif
138  }
139  }
140 
141  inline void unlock() {
142  latch.clear(std::memory_order_release);
143  }
144 };
145 #endif
146 
147 
148 
149 template<typename T>
151 
152  struct cell_t {
153  std::atomic<size_t> sequence;
154  T data;
155  };
156 
157  bool initialized;
158  size_t dsize;
159  cell_t *data;
160  CACHE_ALIGNED(std::atomic<size_t>, rPtr);
161  CACHE_ALIGNED(std::atomic<size_t>, wPtr);
162 
163 public:
164  // BoundedQueue(size_t maxSize) : dsize(maxSize)
165  BoundedQueue(size_t maxSize) : initialized(false)
166  {
167  initialize(maxSize);
168  }
169 
170  BoundedQueue() : initialized(false) {}
171 
172  void initialize(size_t maxSize) {
173  if ( initialized ) return;
174  dsize = maxSize;
175  data = new cell_t[dsize];
176  for ( size_t i = 0 ; i < maxSize ; i++ )
177  data[i].sequence.store(i);
178  rPtr.store(0);
179  wPtr.store(0);
180  //fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
181  initialized = true;
182  }
183 
184  ~BoundedQueue()
185  {
186  if ( initialized ) delete [] data;
187  }
188 
189  size_t size() const
190  {
191  return (wPtr.load() - rPtr.load());
192  }
193 
194  bool empty() const
195  {
196  return (rPtr.load() == wPtr.load());
197  }
198 
199  bool try_insert(const T& arg)
200  {
201  cell_t *cell = nullptr;
202  size_t pos = wPtr.load(std::memory_order_relaxed);
203  for (;;) {
204  cell = &data[pos % dsize];
205  size_t seq = cell->sequence.load(std::memory_order_acquire);
206  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
207  if ( 0 == diff ) {
208  if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
209  break;
210  } else if ( 0 > diff ) {
211  // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
212  return false;
213  } else {
214  pos = wPtr.load(std::memory_order_relaxed);
215  }
216  }
217  cell->data = arg;
218  cell->sequence.store(pos+1, std::memory_order_release);
219  return true;
220  }
221 
222  bool try_remove(T &res)
223  {
224  cell_t *cell = nullptr;
225  size_t pos = rPtr.load(std::memory_order_relaxed);
226  for (;;) {
227  cell = &data[pos % dsize];
228  size_t seq = cell->sequence.load(std::memory_order_acquire);
229  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
230  if ( 0 == diff ) {
231  if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
232  break;
233  } else if ( 0 > diff ) {
234  return false;
235  } else {
236  pos = rPtr.load(std::memory_order_relaxed);
237  }
238  }
239  res = cell->data;
240  cell->sequence.store(pos + dsize, std::memory_order_release);
241  return true;
242  }
243 
244  T remove() {
245  while(1) {
246  T res;
247  if ( try_remove(res) ) {
248  return res;
249  }
250  sst_pause();
251  }
252  }
253 };
254 
255 template<typename T>
257  struct CACHE_ALIGNED_T Node {
258  std::atomic<Node*> next;
259  T data;
260 
261  Node() : next(nullptr) { }
262  };
263 
264  CACHE_ALIGNED(Node*, first);
265  CACHE_ALIGNED(Node*, last);
266  CACHE_ALIGNED(Spinlock, consumerLock);
267  CACHE_ALIGNED(Spinlock, producerLock);
268 
269 public:
270  UnboundedQueue() {
271  /* 'first' is a dummy value */
272  first = last = new Node();
273  }
274 
275  ~UnboundedQueue() {
276  while( first != nullptr ) { // release the list
277  Node* tmp = first;
278  first = tmp->next;
279  delete tmp;
280  }
281  }
282 
283  void insert(const T& t) {
284  Node* tmp = new Node();
285  tmp->data = t;
286  std::lock_guard<Spinlock> lock(producerLock);
287  last->next = tmp; // publish to consumers
288  last = tmp; // swing last forward
289  }
290 
291  bool try_remove( T& result) {
292  std::lock_guard<Spinlock> lock(consumerLock);
293  Node* theFirst = first;
294  Node* theNext = first->next;
295  if( theNext != nullptr ) { // if queue is nonempty
296  result = theNext->data; // take it out
297  first = theNext; // swing first forward
298  delete theFirst; // delete the old dummy
299  return true;
300  }
301  return false;
302  }
303 
304  T remove() {
305  while(1) {
306  T res;
307  if ( try_remove(res) ) {
308  return res;
309  }
310  sst_pause();
311  }
312  }
313 
314 };
315 
316 }
317 }
318 }
319 
320 #endif
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:77
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:65
Definition: threadsafe.h:127
Definition: threadsafe.h:256
Definition: threadsafe.h:50
Definition: threadsafe.h:150