SST  15.1.0
StructuralSimulationToolkit
threadsafe.h
1 // Copyright 2009-2025 NTESS. Under the terms
2 // of Contract DE-NA0003525 with NTESS, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2025, NTESS
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_THREADSAFE_H
13 #define SST_CORE_THREADSAFE_H
14 
15 #if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) )
16 #include <x86intrin.h>
17 #define sst_pause() _mm_pause()
18 #elif (defined(__arm__) || defined(__arm) || defined(__aarch64__))
19 #define sst_pause() __asm__ __volatile__("yield")
20 #elif defined(__PPC64__)
21 #define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory");
22 #else
23 #define sst_pause() std::this_thread::yield()
24 #endif
25 
26 #include <atomic>
27 #include <condition_variable>
28 #include <cstddef>
29 #include <cstdint>
30 #include <mutex>
31 #include <thread>
32 #include <vector>
33 // #include <stdalign.h>
34 
35 #include "sst/core/profile.h"
36 
37 #include <time.h>
38 
39 namespace SST::Core::ThreadSafe {
40 
41 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8))
42 #define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
43 #define CACHE_ALIGNED_T
44 #else
45 #define CACHE_ALIGNED(type, name) alignas(64) type name
46 #define CACHE_ALIGNED_T alignas(64)
47 #endif
48 
49 class CACHE_ALIGNED_T Barrier
50 {
51  size_t origCount;
52  std::atomic<bool> enabled;
53  std::atomic<size_t> count, generation;
54 
55 public:
56  Barrier(size_t count) :
57  origCount(count),
58  enabled(true),
59  count(count),
60  generation(0)
61  {}
62 
63  // Come g++ 4.7, this can become a delegating constructor
64  Barrier() :
65  origCount(0),
66  enabled(false),
67  count(0),
68  generation(0)
69  {}
70 
71  /** ONLY call this while nobody is in wait() */
72  void resize(size_t newCount)
73  {
74  count = origCount = newCount;
75  generation.store(0);
76  enabled.store(true);
77  }
78 
79  /**
80  * Wait for all threads to reach this point.
81  * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
82  */
83  double wait()
84  {
85  double elapsed = 0.0;
86  if ( enabled ) {
87  auto startTime = SST::Core::Profile::now();
88 
89  size_t gen = generation.load(std::memory_order_acquire);
90  asm("" ::: "memory");
91  size_t c = count.fetch_sub(1) - 1;
92  if ( 0 == c ) {
93  /* We should release */
94  count.store(origCount);
95  asm("" ::: "memory");
96  /* Incrementing generation causes release */
97  generation.fetch_add(1, std::memory_order_release);
98  __sync_synchronize();
99  }
100  else {
101  /* Try spinning first */
102  uint32_t count = 0;
103  do {
104  count++;
105  if ( count < 1024 ) {
106  sst_pause();
107  }
108  else if ( count < (1024 * 1024) ) {
109  std::this_thread::yield();
110  }
111  else {
112  struct timespec ts;
113  ts.tv_sec = 0;
114  ts.tv_nsec = 1000;
115  nanosleep(&ts, nullptr);
116  }
117  } while ( gen == generation.load(std::memory_order_acquire) );
118  }
119  elapsed = SST::Core::Profile::getElapsed(startTime);
120  }
121  return elapsed;
122  }
123 
124  void disable()
125  {
126  enabled.store(false);
127  count.store(0);
128  ++generation;
129  }
130 };
131 
132 #if 0
133 using Spinlock = std::mutex;
134 #else
135 class Spinlock
136 {
137  std::atomic_flag latch = ATOMIC_FLAG_INIT;
138 
139 public:
140  Spinlock() {}
141 
142  inline void lock()
143  {
144  while ( latch.test_and_set(std::memory_order_acquire) ) {
145  sst_pause();
146 #if defined(__PPC64__)
147  __sync_synchronize();
148 #endif
149  }
150  }
151 
152  inline void unlock() { latch.clear(std::memory_order_release); }
153 };
154 #endif
155 
157 {
158 public:
159  EmptySpinlock() {}
160  inline void lock() {}
161 
162  inline void unlock() {}
163 };
164 
165 template <typename T>
167 {
168 
169  struct cell_t
170  {
171  std::atomic<size_t> sequence;
172  T data;
173  };
174 
175  bool initialized;
176  size_t dsize;
177  cell_t* data;
178  CACHE_ALIGNED(std::atomic<size_t>, rPtr);
179  CACHE_ALIGNED(std::atomic<size_t>, wPtr);
180 
181 public:
182  // BoundedQueue(size_t maxSize) : dsize(maxSize)
183  explicit BoundedQueue(size_t maxSize) :
184  initialized(false)
185  {
186  initialize(maxSize);
187  }
188 
189  BoundedQueue() :
190  initialized(false)
191  {}
192 
193  void initialize(size_t maxSize)
194  {
195  if ( initialized ) return;
196  dsize = maxSize;
197  data = new cell_t[dsize];
198  for ( size_t i = 0; i < maxSize; i++ )
199  data[i].sequence.store(i);
200  rPtr.store(0);
201  wPtr.store(0);
202  // fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
203  initialized = true;
204  }
205 
206  ~BoundedQueue()
207  {
208  if ( initialized ) delete[] data;
209  }
210 
211  size_t size() const { return (wPtr.load() - rPtr.load()); }
212 
213  bool empty() const { return (rPtr.load() == wPtr.load()); }
214 
215  bool try_insert(const T& arg)
216  {
217  cell_t* cell = nullptr;
218  size_t pos = wPtr.load(std::memory_order_relaxed);
219  for ( ;; ) {
220  cell = &data[pos % dsize];
221  size_t seq = cell->sequence.load(std::memory_order_acquire);
222  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
223  if ( 0 == diff ) {
224  if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
225  }
226  else if ( 0 > diff ) {
227  // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
228  return false;
229  }
230  else {
231  pos = wPtr.load(std::memory_order_relaxed);
232  }
233  }
234  cell->data = arg;
235  cell->sequence.store(pos + 1, std::memory_order_release);
236  return true;
237  }
238 
239  bool try_remove(T& res)
240  {
241  cell_t* cell = nullptr;
242  size_t pos = rPtr.load(std::memory_order_relaxed);
243  for ( ;; ) {
244  cell = &data[pos % dsize];
245  size_t seq = cell->sequence.load(std::memory_order_acquire);
246  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
247  if ( 0 == diff ) {
248  if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
249  }
250  else if ( 0 > diff ) {
251  return false;
252  }
253  else {
254  pos = rPtr.load(std::memory_order_relaxed);
255  }
256  }
257  res = cell->data;
258  cell->sequence.store(pos + dsize, std::memory_order_release);
259  return true;
260  }
261 
262  T remove()
263  {
264  while ( 1 ) {
265  T res;
266  if ( try_remove(res) ) {
267  return res;
268  }
269  sst_pause();
270  }
271  }
272 };
273 
274 template <typename T>
276 {
277  struct CACHE_ALIGNED_T Node
278  {
279  std::atomic<Node*> next;
280  T data;
281 
282  Node() :
283  next(nullptr)
284  {}
285  };
286 
287  CACHE_ALIGNED(Node*, first);
288  CACHE_ALIGNED(Node*, last);
289  CACHE_ALIGNED(Spinlock, consumerLock);
290  CACHE_ALIGNED(Spinlock, producerLock);
291 
292 public:
294  {
295  /* 'first' is a dummy value */
296  first = last = new Node();
297  }
298 
299  ~UnboundedQueue()
300  {
301  while ( first != nullptr ) { // release the list
302  Node* tmp = first;
303  first = tmp->next;
304  delete tmp;
305  }
306  }
307 
308  void insert(const T& t)
309  {
310  Node* tmp = new Node();
311  tmp->data = t;
312  std::lock_guard<Spinlock> lock(producerLock);
313  last->next = tmp; // publish to consumers
314  last = tmp; // swing last forward
315  }
316 
317  bool try_remove(T& result)
318  {
319  std::lock_guard<Spinlock> lock(consumerLock);
320  Node* theFirst = first;
321  Node* theNext = first->next;
322  if ( theNext != nullptr ) { // if queue is nonempty
323  result = theNext->data; // take it out
324  first = theNext; // swing first forward
325  delete theFirst; // delete the old dummy
326  return true;
327  }
328  return false;
329  }
330 
331  T remove()
332  {
333  while ( 1 ) {
334  T res;
335  if ( try_remove(res) ) {
336  return res;
337  }
338  sst_pause();
339  }
340  }
341 };
342 
343 // Replace with std::atomic_fetch_max at C++26
344 template <typename T>
345 void
346 atomic_fetch_max(std::atomic<T>& max_value, T const& new_value) noexcept
347 {
348  T old_value = max_value;
349  while ( old_value < new_value && !max_value.compare_exchange_weak(old_value, new_value) ) {}
350 }
351 
352 // Replace with std::atomic_fetch_min at C++26
353 template <typename T>
354 void
355 atomic_fetch_min(std::atomic<T>& min_value, T const& new_value) noexcept
356 {
357  T old_value = min_value;
358  while ( old_value > new_value && !min_value.compare_exchange_weak(old_value, new_value) ) {}
359 }
360 
361 } // namespace SST::Core::ThreadSafe
362 
363 #endif // SST_CORE_THREADSAFE_H
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:83
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:72
Definition: threadsafe.h:135
Definition: threadsafe.h:275
Definition: threadsafe.h:39
Definition: threadsafe.h:49
Definition: threadsafe.h:156
Definition: threadsafe.h:166