12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
15 #include <x86intrin.h>
19 #include <condition_variable>
27 #include <sst/core/profile.h>
31 namespace ThreadSafe {
33 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
34 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
35 # define CACHE_ALIGNED_T
37 # define CACHE_ALIGNED(type, name) alignas(64) type name
38 # define CACHE_ALIGNED_T alignas(64)
44 std::atomic<bool> enabled;
45 std::atomic<size_t> count, generation;
47 Barrier(
size_t count) : origCount(count), enabled(
true),
48 count(count), generation(0)
52 Barrier() : origCount(0), enabled(
false), count(0), generation(0)
59 count = origCount = newCount;
73 auto startTime = SST::Core::Profile::now();
75 size_t gen = generation.load(std::memory_order_acquire);
77 size_t c = count.fetch_sub(1) -1;
80 count.store(origCount);
83 generation.fetch_add(1, std::memory_order_release);
92 else if ( count < (1024*1024) )
93 std::this_thread::yield();
100 }
while ( gen == generation.load(std::memory_order_acquire) );
102 elapsed = SST::Core::Profile::getElapsed(startTime);
109 enabled.store(
false);
117 std::atomic<int> latch;
125 while ( !latch.compare_exchange_weak(zero, 1,
126 std::memory_order_acquire,
127 std::memory_order_relaxed) && zero) {
131 }
while ( latch.load(std::memory_order_acquire) );
135 inline void unlock() {
147 std::atomic<size_t> sequence;
154 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
155 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
166 void initialize(
size_t maxSize) {
167 if ( initialized )
return;
169 data =
new cell_t[dsize];
170 for (
size_t i = 0 ; i < maxSize ; i++ )
171 data[i].sequence.store(i);
180 if ( initialized )
delete [] data;
185 return (wPtr.load() - rPtr.load());
190 return (rPtr.load() == wPtr.load());
193 bool try_insert(
const T& arg)
196 size_t pos = wPtr.load(std::memory_order_relaxed);
198 cell = &data[pos % dsize];
199 size_t seq = cell->sequence.load(std::memory_order_acquire);
200 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
202 if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
204 }
else if ( 0 > diff ) {
208 pos = wPtr.load(std::memory_order_relaxed);
212 cell->sequence.store(pos+1, std::memory_order_release);
216 bool try_remove(T &res)
219 size_t pos = rPtr.load(std::memory_order_relaxed);
221 cell = &data[pos % dsize];
222 size_t seq = cell->sequence.load(std::memory_order_acquire);
223 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
225 if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
227 }
else if ( 0 > diff ) {
230 pos = rPtr.load(std::memory_order_relaxed);
234 cell->sequence.store(pos + dsize, std::memory_order_release);
241 if ( try_remove(res) ) {
251 struct CACHE_ALIGNED_T Node {
252 std::atomic<Node*> next;
255 Node() : next(
nullptr) { }
258 CACHE_ALIGNED(Node*, first);
259 CACHE_ALIGNED(Node*, last);
260 CACHE_ALIGNED(
Spinlock, consumerLock);
261 CACHE_ALIGNED(
Spinlock, producerLock);
266 first = last =
new Node();
270 while( first !=
nullptr ) {
277 void insert(
const T& t) {
278 Node* tmp =
new Node();
280 std::lock_guard<Spinlock> lock(producerLock);
285 bool try_remove( T& result) {
286 std::lock_guard<Spinlock> lock(consumerLock);
287 Node* theFirst = first;
288 Node* theNext = first->next;
289 if( theNext !=
nullptr ) {
290 result = theNext->data;
301 if ( try_remove(res) ) {
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:69
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:57
Definition: threadsafe.h:116
Definition: threadsafe.h:250
Definition: threadsafe.h:42
Definition: threadsafe.h:144