SST 12.1.0
Structural Simulation Toolkit
threadsafe.h
1// Copyright 2009-2022 NTESS. Under the terms
2// of Contract DE-NA0003525 with NTESS, the U.S.
3// Government retains certain rights in this software.
4//
5// Copyright (c) 2009-2022, NTESS
6// All rights reserved.
7//
8// This file is part of the SST software package. For license
9// information, see the LICENSE file in the top level directory of the
10// distribution.
11
12#ifndef SST_CORE_THREADSAFE_H
13#define SST_CORE_THREADSAFE_H
14
15#if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) )
16#include <x86intrin.h>
17#define sst_pause() _mm_pause()
18#elif (defined(__arm__) || defined(__arm) || defined(__aarch64__))
19#define sst_pause() __asm__ __volatile__("yield")
20#elif defined(__PPC64__)
21#define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory");
22#endif
23
24#include <atomic>
25#include <condition_variable>
26#include <mutex>
27#include <thread>
28#include <vector>
29//#include <stdalign.h>
30
31#include "sst/core/profile.h"
32
33#include <time.h>
34
35namespace SST {
36namespace Core {
37namespace ThreadSafe {
38
39#if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8))
40#define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
41#define CACHE_ALIGNED_T
42#else
43#define CACHE_ALIGNED(type, name) alignas(64) type name
44#define CACHE_ALIGNED_T alignas(64)
45#endif
46
47class CACHE_ALIGNED_T Barrier
48{
49 size_t origCount;
50 std::atomic<bool> enabled;
51 std::atomic<size_t> count, generation;
52
53public:
54 Barrier(size_t count) : origCount(count), enabled(true), count(count), generation(0) {}
55
56 // Come g++ 4.7, this can become a delegating constructor
57 Barrier() : origCount(0), enabled(false), count(0), generation(0) {}
58
59 /** ONLY call this while nobody is in wait() */
60 void resize(size_t newCount)
61 {
62 count = origCount = newCount;
63 generation.store(0);
64 enabled.store(true);
65 }
66
67 /**
68 * Wait for all threads to reach this point.
69 * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
70 */
71 double wait()
72 {
73 double elapsed = 0.0;
74 if ( enabled ) {
75 auto startTime = SST::Core::Profile::now();
76
77 size_t gen = generation.load(std::memory_order_acquire);
78 asm("" ::: "memory");
79 size_t c = count.fetch_sub(1) - 1;
80 if ( 0 == c ) {
81 /* We should release */
82 count.store(origCount);
83 asm("" ::: "memory");
84 /* Incrementing generation causes release */
85 generation.fetch_add(1, std::memory_order_release);
86 __sync_synchronize();
87 }
88 else {
89 /* Try spinning first */
90 uint32_t count = 0;
91 do {
92 count++;
93 if ( count < 1024 ) { sst_pause(); }
94 else if ( count < (1024 * 1024) ) {
95 std::this_thread::yield();
96 }
97 else {
98 struct timespec ts;
99 ts.tv_sec = 0;
100 ts.tv_nsec = 1000;
101 nanosleep(&ts, nullptr);
102 }
103 } while ( gen == generation.load(std::memory_order_acquire) );
104 }
105 elapsed = SST::Core::Profile::getElapsed(startTime);
106 }
107 return elapsed;
108 }
109
110 void disable()
111 {
112 enabled.store(false);
113 count.store(0);
114 ++generation;
115 }
116};
117
118#if 0
119typedef std::mutex Spinlock;
120#else
122{
123 std::atomic_flag latch = ATOMIC_FLAG_INIT;
124
125public:
126 Spinlock() {}
127
128 inline void lock()
129 {
130 while ( latch.test_and_set(std::memory_order_acquire) ) {
131 sst_pause();
132#if defined(__PPC64__)
133 __sync_synchronize();
134#endif
135 }
136 }
137
138 inline void unlock() { latch.clear(std::memory_order_release); }
139};
140#endif
141
143{
144public:
145 EmptySpinlock() {}
146 inline void lock() {}
147
148 inline void unlock() {}
149};
150
151template <typename T>
153{
154
155 struct cell_t
156 {
157 std::atomic<size_t> sequence;
158 T data;
159 };
160
161 bool initialized;
162 size_t dsize;
163 cell_t* data;
164 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
165 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
166
167public:
168 // BoundedQueue(size_t maxSize) : dsize(maxSize)
169 BoundedQueue(size_t maxSize) : initialized(false) { initialize(maxSize); }
170
171 BoundedQueue() : initialized(false) {}
172
173 void initialize(size_t maxSize)
174 {
175 if ( initialized ) return;
176 dsize = maxSize;
177 data = new cell_t[dsize];
178 for ( size_t i = 0; i < maxSize; i++ )
179 data[i].sequence.store(i);
180 rPtr.store(0);
181 wPtr.store(0);
182 // fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
183 initialized = true;
184 }
185
187 {
188 if ( initialized ) delete[] data;
189 }
190
191 size_t size() const { return (wPtr.load() - rPtr.load()); }
192
193 bool empty() const { return (rPtr.load() == wPtr.load()); }
194
195 bool try_insert(const T& arg)
196 {
197 cell_t* cell = nullptr;
198 size_t pos = wPtr.load(std::memory_order_relaxed);
199 for ( ;; ) {
200 cell = &data[pos % dsize];
201 size_t seq = cell->sequence.load(std::memory_order_acquire);
202 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
203 if ( 0 == diff ) {
204 if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
205 }
206 else if ( 0 > diff ) {
207 // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
208 return false;
209 }
210 else {
211 pos = wPtr.load(std::memory_order_relaxed);
212 }
213 }
214 cell->data = arg;
215 cell->sequence.store(pos + 1, std::memory_order_release);
216 return true;
217 }
218
219 bool try_remove(T& res)
220 {
221 cell_t* cell = nullptr;
222 size_t pos = rPtr.load(std::memory_order_relaxed);
223 for ( ;; ) {
224 cell = &data[pos % dsize];
225 size_t seq = cell->sequence.load(std::memory_order_acquire);
226 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
227 if ( 0 == diff ) {
228 if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) ) break;
229 }
230 else if ( 0 > diff ) {
231 return false;
232 }
233 else {
234 pos = rPtr.load(std::memory_order_relaxed);
235 }
236 }
237 res = cell->data;
238 cell->sequence.store(pos + dsize, std::memory_order_release);
239 return true;
240 }
241
242 T remove()
243 {
244 while ( 1 ) {
245 T res;
246 if ( try_remove(res) ) { return res; }
247 sst_pause();
248 }
249 }
250};
251
252template <typename T>
254{
255 struct CACHE_ALIGNED_T Node
256 {
257 std::atomic<Node*> next;
258 T data;
259
260 Node() : next(nullptr) {}
261 };
262
263 CACHE_ALIGNED(Node*, first);
264 CACHE_ALIGNED(Node*, last);
265 CACHE_ALIGNED(Spinlock, consumerLock);
266 CACHE_ALIGNED(Spinlock, producerLock);
267
268public:
270 {
271 /* 'first' is a dummy value */
272 first = last = new Node();
273 }
274
276 {
277 while ( first != nullptr ) { // release the list
278 Node* tmp = first;
279 first = tmp->next;
280 delete tmp;
281 }
282 }
283
284 void insert(const T& t)
285 {
286 Node* tmp = new Node();
287 tmp->data = t;
288 std::lock_guard<Spinlock> lock(producerLock);
289 last->next = tmp; // publish to consumers
290 last = tmp; // swing last forward
291 }
292
293 bool try_remove(T& result)
294 {
295 std::lock_guard<Spinlock> lock(consumerLock);
296 Node* theFirst = first;
297 Node* theNext = first->next;
298 if ( theNext != nullptr ) { // if queue is nonempty
299 result = theNext->data; // take it out
300 first = theNext; // swing first forward
301 delete theFirst; // delete the old dummy
302 return true;
303 }
304 return false;
305 }
306
307 T remove()
308 {
309 while ( 1 ) {
310 T res;
311 if ( try_remove(res) ) { return res; }
312 sst_pause();
313 }
314 }
315};
316
317} // namespace ThreadSafe
318} // namespace Core
319} // namespace SST
320
321#endif // SST_CORE_THREADSAFE_H
Definition: threadsafe.h:48
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:71
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:60
Definition: threadsafe.h:153
Definition: threadsafe.h:143
Definition: threadsafe.h:122
Definition: threadsafe.h:254