SST 15.0
Structural Simulation Toolkit
threadsafe.h
1// Copyright 2009-2025 NTESS. Under the terms
2// of Contract DE-NA0003525 with NTESS, the U.S.
3// Government retains certain rights in this software.
4//
5// Copyright (c) 2009-2025, NTESS
6// All rights reserved.
7//
8// This file is part of the SST software package. For license
9// information, see the LICENSE file in the top level directory of the
10// distribution.
11
12#ifndef SST_CORE_THREADSAFE_H
13#define SST_CORE_THREADSAFE_H
14
15#if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) )
16#include <x86intrin.h>
17#define sst_pause() _mm_pause()
18#elif (defined(__arm__) || defined(__arm) || defined(__aarch64__))
19#define sst_pause() __asm__ __volatile__("yield")
20#elif defined(__PPC64__)
21#define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory");
22#endif
23
24#include <atomic>
25#include <condition_variable>
26#include <mutex>
27#include <thread>
28#include <vector>
29// #include <stdalign.h>
30
31#include "sst/core/profile.h"
32
33#include <time.h>
34
35namespace SST::Core::ThreadSafe {
36
37#if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8))
38#define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
39#define CACHE_ALIGNED_T
40#else
41#define CACHE_ALIGNED(type, name) alignas(64) type name
42#define CACHE_ALIGNED_T alignas(64)
43#endif
44
45class CACHE_ALIGNED_T Barrier
46{
47 size_t origCount;
48 std::atomic<bool> enabled;
49 std::atomic<size_t> count, generation;
50
51public:
52 Barrier(size_t count) :
53 origCount(count),
54 enabled(true),
55 count(count),
56 generation(0)
57 {}
58
59 // Come g++ 4.7, this can become a delegating constructor
60 Barrier() :
61 origCount(0),
62 enabled(false),
63 count(0),
64 generation(0)
65 {}
66
67 /** ONLY call this while nobody is in wait() */
68 void resize(size_t newCount)
69 {
70 count = origCount = newCount;
71 generation.store(0);
72 enabled.store(true);
73 }
74
75 /**
76 * Wait for all threads to reach this point.
77 * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
78 */
79 double wait()
80 {
81 double elapsed = 0.0;
82 if ( enabled ) {
83 auto startTime = SST::Core::Profile::now();
84
85 size_t gen = generation.load(std::memory_order_acquire);
86 asm("" ::: "memory");
87 size_t c = count.fetch_sub(1) - 1;
88 if ( 0 == c ) {
89 /* We should release */
90 count.store(origCount);
91 asm("" ::: "memory");
92 /* Incrementing generation causes release */
93 generation.fetch_add(1, std::memory_order_release);
94 __sync_synchronize();
95 }
96 else {
97 /* Try spinning first */
98 uint32_t count = 0;
99 do {
100 count++;
101 if ( count < 1024 ) {
102 sst_pause();
103 }
104 else if ( count < (1024 * 1024) ) {
105 std::this_thread::yield();
106 }
107 else {
108 struct timespec ts;
109 ts.tv_sec = 0;
110 ts.tv_nsec = 1000;
111 nanosleep(&ts, nullptr);
112 }
113 } while ( gen == generation.load(std::memory_order_acquire) );
114 }
115 elapsed = SST::Core::Profile::getElapsed(startTime);
116 }
117 return elapsed;
118 }
119
120 void disable()
121 {
122 enabled.store(false);
123 count.store(0);
124 ++generation;
125 }
126};
127
128#if 0
129using Spinlock = std::mutex;
130#else
131class Spinlock
132{
133 std::atomic_flag latch = ATOMIC_FLAG_INIT;
134
135public:
136 Spinlock() {}
137
138 inline void lock()
139 {
140 while ( latch.test_and_set(std::memory_order_acquire) ) {
141 sst_pause();
142#if defined(__PPC64__)
143 __sync_synchronize();
144#endif
145 }
146 }
147
148 inline void unlock() { latch.clear(std::memory_order_release); }
149};
150#endif
151
152class EmptySpinlock
153{
154public:
155 EmptySpinlock() {}
156 inline void lock() {}
157
158 inline void unlock() {}
159};
160
161template <typename T>
162class BoundedQueue
163{
164
165 struct cell_t
166 {
167 std::atomic<size_t> sequence;
168 T data;
169 };
170
171 bool initialized;
172 size_t dsize;
173 cell_t* data;
174 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
175 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
176
177public:
178 // BoundedQueue(size_t maxSize) : dsize(maxSize)
179 explicit BoundedQueue(size_t maxSize) :
180 initialized(false)
181 {
182 initialize(maxSize);
183 }
184
185 BoundedQueue() :
186 initialized(false)
187 {}
188
189 void initialize(size_t maxSize)
190 {
191 if ( initialized ) return;
192 dsize = maxSize;
193 data = new cell_t[dsize];
194 for ( size_t i = 0; i < maxSize; i++ )
195 data[i].sequence.store(i);
196 rPtr.store(0);
197 wPtr.store(0);
198 // fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
199 initialized = true;
200 }
201
202 ~BoundedQueue()
203 {
204 if ( initialized ) delete[] data;
205 }
206
207 size_t size() const { return (wPtr.load() - rPtr.load()); }
208
209 bool empty() const { return (rPtr.load() == wPtr.load()); }
210
211 bool try_insert(const T& arg)
212 {
213 cell_t* cell = nullptr;
214 size_t pos = wPtr.load(std::memory_order_relaxed);
215 for ( ;; ) {
216 cell = &data[pos % dsize];
217 size_t seq = cell->sequence.load(std::memory_order_acquire);
218 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
219 if ( 0 == diff ) {
220 if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
221 }
222 else if ( 0 > diff ) {
223 // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
224 return false;
225 }
226 else {
227 pos = wPtr.load(std::memory_order_relaxed);
228 }
229 }
230 cell->data = arg;
231 cell->sequence.store(pos + 1, std::memory_order_release);
232 return true;
233 }
234
235 bool try_remove(T& res)
236 {
237 cell_t* cell = nullptr;
238 size_t pos = rPtr.load(std::memory_order_relaxed);
239 for ( ;; ) {
240 cell = &data[pos % dsize];
241 size_t seq = cell->sequence.load(std::memory_order_acquire);
242 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
243 if ( 0 == diff ) {
244 if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
245 }
246 else if ( 0 > diff ) {
247 return false;
248 }
249 else {
250 pos = rPtr.load(std::memory_order_relaxed);
251 }
252 }
253 res = cell->data;
254 cell->sequence.store(pos + dsize, std::memory_order_release);
255 return true;
256 }
257
258 T remove()
259 {
260 while ( 1 ) {
261 T res;
262 if ( try_remove(res) ) {
263 return res;
264 }
265 sst_pause();
266 }
267 }
268};
269
270template <typename T>
271class UnboundedQueue
272{
273 struct CACHE_ALIGNED_T Node
274 {
275 std::atomic<Node*> next;
276 T data;
277
278 Node() :
279 next(nullptr)
280 {}
281 };
282
283 CACHE_ALIGNED(Node*, first);
284 CACHE_ALIGNED(Node*, last);
285 CACHE_ALIGNED(Spinlock, consumerLock);
286 CACHE_ALIGNED(Spinlock, producerLock);
287
288public:
289 UnboundedQueue()
290 {
291 /* 'first' is a dummy value */
292 first = last = new Node();
293 }
294
295 ~UnboundedQueue()
296 {
297 while ( first != nullptr ) { // release the list
298 Node* tmp = first;
299 first = tmp->next;
300 delete tmp;
301 }
302 }
303
304 void insert(const T& t)
305 {
306 Node* tmp = new Node();
307 tmp->data = t;
308 std::lock_guard<Spinlock> lock(producerLock);
309 last->next = tmp; // publish to consumers
310 last = tmp; // swing last forward
311 }
312
313 bool try_remove(T& result)
314 {
315 std::lock_guard<Spinlock> lock(consumerLock);
316 Node* theFirst = first;
317 Node* theNext = first->next;
318 if ( theNext != nullptr ) { // if queue is nonempty
319 result = theNext->data; // take it out
320 first = theNext; // swing first forward
321 delete theFirst; // delete the old dummy
322 return true;
323 }
324 return false;
325 }
326
327 T remove()
328 {
329 while ( 1 ) {
330 T res;
331 if ( try_remove(res) ) {
332 return res;
333 }
334 sst_pause();
335 }
336 }
337};
338
339// Replace with std::atomic_fetch_max at C++26
340template <typename T>
341void
342atomic_fetch_max(std::atomic<T>& max_value, T const& new_value) noexcept
343{
344 T old_value = max_value;
345 while ( old_value < new_value && !max_value.compare_exchange_weak(old_value, new_value) ) {}
346}
347
348// Replace with std::atomic_fetch_min at C++26
349template <typename T>
350void
351atomic_fetch_min(std::atomic<T>& min_value, T const& new_value) noexcept
352{
353 T old_value = min_value;
354 while ( old_value > new_value && !min_value.compare_exchange_weak(old_value, new_value) ) {}
355}
356
357} // namespace SST::Core::ThreadSafe
358
359#endif // SST_CORE_THREADSAFE_H
double wait()
Wait for all threads to reach this point.
Definition threadsafe.h:79
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition threadsafe.h:68
Definition threadsafe.h:132