12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
15 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
16 defined( __x86_64 ) || defined( __x86_64__ ) )
17 #include <x86intrin.h>
18 #define sst_pause() _mm_pause()
19 #elif ( defined(__arm__) || defined(__arm) || defined(__aarch64__) )
20 #define sst_pause() __asm__ __volatile__ ("yield")
21 #elif defined(__PPC64__)
22 #define sst_pause() __asm__ __volatile__ ( "or 27, 27, 27" ::: "memory" );
27 #include <condition_variable>
35 #include "sst/core/profile.h"
39 namespace ThreadSafe {
41 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
42 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
43 # define CACHE_ALIGNED_T
45 # define CACHE_ALIGNED(type, name) alignas(64) type name
46 # define CACHE_ALIGNED_T alignas(64)
52 std::atomic<bool> enabled;
53 std::atomic<size_t> count, generation;
55 Barrier(
size_t count) : origCount(count), enabled(
true),
56 count(count), generation(0)
60 Barrier() : origCount(0), enabled(
false), count(0), generation(0)
67 count = origCount = newCount;
81 auto startTime = SST::Core::Profile::now();
83 size_t gen = generation.load(std::memory_order_acquire);
85 size_t c = count.fetch_sub(1) -1;
88 count.store(origCount);
91 generation.fetch_add(1, std::memory_order_release);
100 }
else if ( count < (1024*1024) ) {
101 std::this_thread::yield();
106 nanosleep(&ts,
nullptr);
108 }
while ( gen == generation.load(std::memory_order_acquire) );
110 elapsed = SST::Core::Profile::getElapsed(startTime);
117 enabled.store(
false);
128 std::atomic_flag latch = ATOMIC_FLAG_INIT;
133 while ( latch.test_and_set(std::memory_order_acquire) ) {
135 #if defined(__PPC64__)
136 __sync_synchronize();
141 inline void unlock() {
142 latch.clear(std::memory_order_release);
153 std::atomic<size_t> sequence;
160 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
161 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
172 void initialize(
size_t maxSize) {
173 if ( initialized )
return;
175 data =
new cell_t[dsize];
176 for (
size_t i = 0 ; i < maxSize ; i++ )
177 data[i].sequence.store(i);
186 if ( initialized )
delete [] data;
191 return (wPtr.load() - rPtr.load());
196 return (rPtr.load() == wPtr.load());
199 bool try_insert(
const T& arg)
201 cell_t *cell =
nullptr;
202 size_t pos = wPtr.load(std::memory_order_relaxed);
204 cell = &data[pos % dsize];
205 size_t seq = cell->sequence.load(std::memory_order_acquire);
206 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
208 if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
210 }
else if ( 0 > diff ) {
214 pos = wPtr.load(std::memory_order_relaxed);
218 cell->sequence.store(pos+1, std::memory_order_release);
222 bool try_remove(T &res)
224 cell_t *cell =
nullptr;
225 size_t pos = rPtr.load(std::memory_order_relaxed);
227 cell = &data[pos % dsize];
228 size_t seq = cell->sequence.load(std::memory_order_acquire);
229 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
231 if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
233 }
else if ( 0 > diff ) {
236 pos = rPtr.load(std::memory_order_relaxed);
240 cell->sequence.store(pos + dsize, std::memory_order_release);
247 if ( try_remove(res) ) {
257 struct CACHE_ALIGNED_T Node {
258 std::atomic<Node*> next;
261 Node() : next(
nullptr) { }
264 CACHE_ALIGNED(Node*, first);
265 CACHE_ALIGNED(Node*, last);
266 CACHE_ALIGNED(
Spinlock, consumerLock);
267 CACHE_ALIGNED(
Spinlock, producerLock);
272 first = last =
new Node();
276 while( first !=
nullptr ) {
283 void insert(
const T& t) {
284 Node* tmp =
new Node();
286 std::lock_guard<Spinlock> lock(producerLock);
291 bool try_remove( T& result) {
292 std::lock_guard<Spinlock> lock(consumerLock);
293 Node* theFirst = first;
294 Node* theNext = first->next;
295 if( theNext !=
nullptr ) {
296 result = theNext->data;
307 if ( try_remove(res) ) {
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:77
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:65
Definition: threadsafe.h:127
Definition: threadsafe.h:256
Definition: threadsafe.h:50
Definition: threadsafe.h:150