SST 16.0.0
Structural Simulation Toolkit
threadsafe.h
1// Copyright 2009-2026 NTESS. Under the terms
2// of Contract DE-NA0003525 with NTESS, the U.S.
3// Government retains certain rights in this software.
4//
5// Copyright (c) 2009-2026, NTESS
6// All rights reserved.
7//
8// This file is part of the SST software package. For license
9// information, see the LICENSE file in the top level directory of the
10// distribution.
11
12#ifndef SST_CORE_THREADSAFE_H
13#define SST_CORE_THREADSAFE_H
14
15#if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) )
16#include <x86intrin.h>
17#define sst_pause() _mm_pause()
18#elif (defined(__arm__) || defined(__arm) || defined(__aarch64__))
19#define sst_pause() __asm__ __volatile__("yield")
20#elif defined(__PPC64__)
21#define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory");
22#else
23#define sst_pause() std::this_thread::yield()
24#endif
25
26#include <atomic>
27#include <condition_variable>
28#include <cstddef>
29#include <cstdint>
30#include <mutex>
31#include <thread>
32#include <vector>
33// #include <stdalign.h>
34
35#include "sst/core/profile.h"
36
37#include <time.h>
38
39namespace SST::Core::ThreadSafe {
40
41#if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8))
42#define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
43#define CACHE_ALIGNED_T
44#else
45#define CACHE_ALIGNED(type, name) alignas(64) type name
46#define CACHE_ALIGNED_T alignas(64)
47#endif
48
49class CACHE_ALIGNED_T Barrier
50{
51 size_t origCount;
52 std::atomic<bool> enabled;
53 std::atomic<size_t> count, generation;
54 std::mutex barrierMutex;
55
56public:
57 Barrier(size_t count) :
58 origCount(count),
59 enabled(true),
60 count(count),
61 generation(0)
62 {}
63
64 // Come g++ 4.7, this can become a delegating constructor
65 Barrier() :
66 origCount(0),
67 enabled(false),
68 count(0),
69 generation(0)
70 {}
71
72 /** ONLY call this while nobody is in wait() */
73 void resize(size_t newCount)
74 {
75 std::lock_guard<std::mutex> lk(barrierMutex);
76 count = origCount = newCount;
77 generation.store(0);
78 enabled.store(true);
79 }
80
81 /**
82 * Wait for all threads to reach this point.
83 * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
84 */
85 double wait()
86 {
87 double elapsed = 0.0;
88 if ( enabled ) {
89 auto startTime = SST::Core::Profile::now();
90
91 size_t gen = generation.load(std::memory_order_acquire);
92 asm("" ::: "memory");
93 size_t c = count.fetch_sub(1) - 1;
94 if ( 0 == c ) {
95 /* We should release */
96 count.store(origCount);
97 asm("" ::: "memory");
98 /* Incrementing generation causes release */
99 generation.fetch_add(1, std::memory_order_release);
100 __sync_synchronize();
101 }
102 else {
103 /* Try spinning first */
104 uint32_t count = 0;
105 do {
106 count++;
107 if ( count < 1024 ) {
108 sst_pause();
109 }
110 else if ( count < (1024 * 1024) ) {
111 std::this_thread::yield();
112 }
113 else {
114 struct timespec ts;
115 ts.tv_sec = 0;
116 ts.tv_nsec = 1000;
117 nanosleep(&ts, nullptr);
118 }
119 } while ( gen == generation.load(std::memory_order_acquire) );
120 }
121 elapsed = SST::Core::Profile::getElapsed(startTime);
122 }
123 return elapsed;
124 }
125
126 void disable()
127 {
128 enabled.store(false);
129 count.store(0);
130 ++generation;
131 }
132};
133
134#if 0
135using Spinlock = std::mutex;
136#else
137class Spinlock
138{
139 std::atomic_flag latch = ATOMIC_FLAG_INIT;
140
141public:
142 Spinlock() {}
143
144 inline void lock()
145 {
146 while ( latch.test_and_set(std::memory_order_acquire) ) {
147 sst_pause();
148#if defined(__PPC64__)
149 __sync_synchronize();
150#endif
151 }
152 }
153
154 inline void unlock() { latch.clear(std::memory_order_release); }
155};
156#endif
157
158class EmptySpinlock
159{
160public:
161 EmptySpinlock() {}
162 inline void lock() {}
163
164 inline void unlock() {}
165};
166
167template <typename T>
168class BoundedQueue
169{
170
171 struct cell_t
172 {
173 std::atomic<size_t> sequence;
174 T data;
175 };
176
177 bool initialized;
178 size_t dsize;
179 cell_t* data;
180 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
181 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
182
183public:
184 // BoundedQueue(size_t maxSize) : dsize(maxSize)
185 explicit BoundedQueue(size_t maxSize) :
186 initialized(false)
187 {
188 initialize(maxSize);
189 }
190
191 BoundedQueue() :
192 initialized(false)
193 {}
194
195 void initialize(size_t maxSize)
196 {
197 if ( initialized ) return;
198 dsize = maxSize;
199 data = new cell_t[dsize];
200 for ( size_t i = 0; i < maxSize; i++ )
201 data[i].sequence.store(i);
202 rPtr.store(0);
203 wPtr.store(0);
204 // fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
205 initialized = true;
206 }
207
208 ~BoundedQueue()
209 {
210 if ( initialized ) delete[] data;
211 }
212
213 size_t size() const { return (wPtr.load() - rPtr.load()); }
214
215 bool empty() const { return (rPtr.load() == wPtr.load()); }
216
217 bool try_insert(const T& arg)
218 {
219 cell_t* cell = nullptr;
220 size_t pos = wPtr.load(std::memory_order_relaxed);
221 for ( ;; ) {
222 cell = &data[pos % dsize];
223 size_t seq = cell->sequence.load(std::memory_order_acquire);
224 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
225 if ( 0 == diff ) {
226 if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
227 }
228 else if ( 0 > diff ) {
229 // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
230 return false;
231 }
232 else {
233 pos = wPtr.load(std::memory_order_relaxed);
234 }
235 }
236 cell->data = arg;
237 cell->sequence.store(pos + 1, std::memory_order_release);
238 return true;
239 }
240
241 bool try_remove(T& res)
242 {
243 cell_t* cell = nullptr;
244 size_t pos = rPtr.load(std::memory_order_relaxed);
245 for ( ;; ) {
246 cell = &data[pos % dsize];
247 size_t seq = cell->sequence.load(std::memory_order_acquire);
248 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
249 if ( 0 == diff ) {
250 if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
251 }
252 else if ( 0 > diff ) {
253 return false;
254 }
255 else {
256 pos = rPtr.load(std::memory_order_relaxed);
257 }
258 }
259 res = cell->data;
260 cell->sequence.store(pos + dsize, std::memory_order_release);
261 return true;
262 }
263
264 T remove()
265 {
266 while ( 1 ) {
267 T res;
268 if ( try_remove(res) ) {
269 return res;
270 }
271 sst_pause();
272 }
273 }
274};
275
276template <typename T>
277class UnboundedQueue
278{
279 struct CACHE_ALIGNED_T Node
280 {
281 std::atomic<Node*> next;
282 T data;
283
284 Node() :
285 next(nullptr)
286 {}
287 };
288
289 CACHE_ALIGNED(Node*, first);
290 CACHE_ALIGNED(Node*, last);
291 CACHE_ALIGNED(Spinlock, consumerLock);
292 CACHE_ALIGNED(Spinlock, producerLock);
293
294public:
295 UnboundedQueue()
296 {
297 /* 'first' is a dummy value */
298 first = last = new Node();
299 }
300
301 ~UnboundedQueue()
302 {
303 while ( first != nullptr ) { // release the list
304 Node* tmp = first;
305 first = tmp->next;
306 delete tmp;
307 }
308 }
309
310 void insert(const T& t)
311 {
312 Node* tmp = new Node();
313 tmp->data = t;
314 std::lock_guard<Spinlock> lock(producerLock);
315 last->next = tmp; // publish to consumers
316 last = tmp; // swing last forward
317 }
318
319 bool try_remove(T& result)
320 {
321 std::lock_guard<Spinlock> lock(consumerLock);
322 Node* theFirst = first;
323 Node* theNext = first->next;
324 if ( theNext != nullptr ) { // if queue is nonempty
325 result = theNext->data; // take it out
326 first = theNext; // swing first forward
327 delete theFirst; // delete the old dummy
328 return true;
329 }
330 return false;
331 }
332
333 T remove()
334 {
335 while ( 1 ) {
336 T res;
337 if ( try_remove(res) ) {
338 return res;
339 }
340 sst_pause();
341 }
342 }
343};
344
345// Replace with std::atomic_fetch_max at C++26
346template <typename T>
347void
348atomic_fetch_max(std::atomic<T>& max_value, T const& new_value) noexcept
349{
350 T old_value = max_value;
351 while ( old_value < new_value && !max_value.compare_exchange_weak(old_value, new_value) ) {}
352}
353
354// Replace with std::atomic_fetch_min at C++26
355template <typename T>
356void
357atomic_fetch_min(std::atomic<T>& min_value, T const& new_value) noexcept
358{
359 T old_value = min_value;
360 while ( old_value > new_value && !min_value.compare_exchange_weak(old_value, new_value) ) {}
361}
362
363} // namespace SST::Core::ThreadSafe
364
365#endif // SST_CORE_THREADSAFE_H
double wait()
Wait for all threads to reach this point.
Definition threadsafe.h:85
void resize(size_t newCount)
ONLY call this while nobody is in wait().
Definition threadsafe.h:73
Definition threadsafe.h:138