12 #ifndef SST_CORE_THREADSAFE_H
13 #define SST_CORE_THREADSAFE_H
15 #if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) )
16 #include <x86intrin.h>
17 #define sst_pause() _mm_pause()
18 #elif (defined(__arm__) || defined(__arm) || defined(__aarch64__))
19 #define sst_pause() __asm__ __volatile__("yield")
20 #elif defined(__PPC64__)
21 #define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory");
25 #include <condition_variable>
31 #include "sst/core/profile.h"
37 namespace ThreadSafe {
39 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8))
40 #define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
41 #define CACHE_ALIGNED_T
43 #define CACHE_ALIGNED(type, name) alignas(64) type name
44 #define CACHE_ALIGNED_T alignas(64)
50 std::atomic<bool> enabled;
51 std::atomic<size_t> count, generation;
54 Barrier(
size_t count) : origCount(count), enabled(
true), count(count), generation(0) {}
57 Barrier() : origCount(0), enabled(
false), count(0), generation(0) {}
62 count = origCount = newCount;
75 auto startTime = SST::Core::Profile::now();
77 size_t gen = generation.load(std::memory_order_acquire);
79 size_t c = count.fetch_sub(1) - 1;
82 count.store(origCount);
85 generation.fetch_add(1, std::memory_order_release);
93 if ( count < 1024 ) { sst_pause(); }
94 else if ( count < (1024 * 1024) ) {
95 std::this_thread::yield();
101 nanosleep(&ts,
nullptr);
103 }
while ( gen == generation.load(std::memory_order_acquire) );
105 elapsed = SST::Core::Profile::getElapsed(startTime);
112 enabled.store(
false);
123 std::atomic_flag latch = ATOMIC_FLAG_INIT;
124 std::thread::id thread_id;
131 if ( thread_id == std::this_thread::get_id() ) printf(
"DEADLOCK\n");
132 while ( latch.test_and_set(std::memory_order_acquire) ) {
134 #if defined(__PPC64__)
135 __sync_synchronize();
138 thread_id = std::this_thread::get_id();
144 thread_id = std::thread::id();
145 latch.clear(std::memory_order_release);
154 inline void lock() {}
156 inline void unlock() {}
159 template <
typename T>
165 std::atomic<size_t> sequence;
172 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
173 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
177 BoundedQueue(
size_t maxSize) : initialized(
false) { initialize(maxSize); }
181 void initialize(
size_t maxSize)
183 if ( initialized )
return;
185 data =
new cell_t[dsize];
186 for (
size_t i = 0; i < maxSize; i++ )
187 data[i].sequence.store(i);
196 if ( initialized )
delete[] data;
199 size_t size()
const {
return (wPtr.load() - rPtr.load()); }
201 bool empty()
const {
return (rPtr.load() == wPtr.load()); }
203 bool try_insert(
const T& arg)
205 cell_t* cell =
nullptr;
206 size_t pos = wPtr.load(std::memory_order_relaxed);
208 cell = &data[pos % dsize];
209 size_t seq = cell->sequence.load(std::memory_order_acquire);
210 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
212 if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) )
break;
214 else if ( 0 > diff ) {
219 pos = wPtr.load(std::memory_order_relaxed);
223 cell->sequence.store(pos + 1, std::memory_order_release);
227 bool try_remove(T& res)
229 cell_t* cell =
nullptr;
230 size_t pos = rPtr.load(std::memory_order_relaxed);
232 cell = &data[pos % dsize];
233 size_t seq = cell->sequence.load(std::memory_order_acquire);
234 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
236 if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) )
break;
238 else if ( 0 > diff ) {
242 pos = rPtr.load(std::memory_order_relaxed);
246 cell->sequence.store(pos + dsize, std::memory_order_release);
254 if ( try_remove(res) ) {
return res; }
260 template <
typename T>
263 struct CACHE_ALIGNED_T Node
265 std::atomic<Node*> next;
268 Node() : next(
nullptr) {}
271 CACHE_ALIGNED(Node*, first);
272 CACHE_ALIGNED(Node*, last);
273 CACHE_ALIGNED(
Spinlock, consumerLock);
274 CACHE_ALIGNED(
Spinlock, producerLock);
280 first = last =
new Node();
285 while ( first !=
nullptr ) {
292 void insert(
const T& t)
294 Node* tmp =
new Node();
296 std::lock_guard<Spinlock> lock(producerLock);
301 bool try_remove(T& result)
303 std::lock_guard<Spinlock> lock(consumerLock);
304 Node* theFirst = first;
305 Node* theNext = first->next;
306 if ( theNext !=
nullptr ) {
307 result = theNext->data;
319 if ( try_remove(res) ) {
return res; }
329 #endif // SST_CORE_THREADSAFE_H
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:71
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:60
Definition: threadsafe.h:121
Definition: threadsafe.h:261
Definition: threadsafe.h:47
Definition: threadsafe.h:150
Definition: threadsafe.h:160