12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
15 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
16 defined( __x86_64 ) || defined( __x86_64__ ) )
17 #include <x86intrin.h>
22 #include <condition_variable>
30 #include "sst/core/profile.h"
34 namespace ThreadSafe {
36 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
37 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
38 # define CACHE_ALIGNED_T
40 # define CACHE_ALIGNED(type, name) alignas(64) type name
41 # define CACHE_ALIGNED_T alignas(64)
47 std::atomic<bool> enabled;
48 std::atomic<size_t> count, generation;
50 Barrier(
size_t count) : origCount(count), enabled(
true),
51 count(count), generation(0)
55 Barrier() : origCount(0), enabled(
false), count(0), generation(0)
62 count = origCount = newCount;
76 auto startTime = SST::Core::Profile::now();
78 size_t gen = generation.load(std::memory_order_acquire);
80 size_t c = count.fetch_sub(1) -1;
83 count.store(origCount);
86 generation.fetch_add(1, std::memory_order_release);
94 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
95 defined( __x86_64 ) || defined( __x86_64__ ) )
97 #elif defined(__PPC64__)
98 asm volatile(
"or 27, 27, 27" :::
"memory" );
100 }
else if ( count < (1024*1024) ) {
101 std::this_thread::yield();
106 nanosleep(&ts,
nullptr);
108 }
while ( gen == generation.load(std::memory_order_acquire) );
110 elapsed = SST::Core::Profile::getElapsed(startTime);
117 enabled.store(
false);
128 std::atomic_flag latch = ATOMIC_FLAG_INIT;
133 while ( latch.test_and_set(std::memory_order_acquire) ) {
134 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
135 defined( __x86_64 ) || defined( __x86_64__ ) )
137 #elif defined(__PPC64__)
138 asm volatile(
"or 27, 27, 27" :::
"memory" );
139 __sync_synchronize();
144 inline void unlock() {
145 latch.clear(std::memory_order_release);
156 std::atomic<size_t> sequence;
163 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
164 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
175 void initialize(
size_t maxSize) {
176 if ( initialized )
return;
178 data =
new cell_t[dsize];
179 for (
size_t i = 0 ; i < maxSize ; i++ )
180 data[i].sequence.store(i);
189 if ( initialized )
delete [] data;
194 return (wPtr.load() - rPtr.load());
199 return (rPtr.load() == wPtr.load());
202 bool try_insert(
const T& arg)
204 cell_t *cell =
nullptr;
205 size_t pos = wPtr.load(std::memory_order_relaxed);
207 cell = &data[pos % dsize];
208 size_t seq = cell->sequence.load(std::memory_order_acquire);
209 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
211 if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
213 }
else if ( 0 > diff ) {
217 pos = wPtr.load(std::memory_order_relaxed);
221 cell->sequence.store(pos+1, std::memory_order_release);
225 bool try_remove(T &res)
227 cell_t *cell =
nullptr;
228 size_t pos = rPtr.load(std::memory_order_relaxed);
230 cell = &data[pos % dsize];
231 size_t seq = cell->sequence.load(std::memory_order_acquire);
232 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
234 if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
236 }
else if ( 0 > diff ) {
239 pos = rPtr.load(std::memory_order_relaxed);
243 cell->sequence.store(pos + dsize, std::memory_order_release);
250 if ( try_remove(res) ) {
253 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
254 defined( __x86_64 ) || defined( __x86_64__ ) )
256 #elif defined(__PPC64__)
257 asm volatile(
"or 27, 27, 27" :::
"memory" );
265 struct CACHE_ALIGNED_T Node {
266 std::atomic<Node*> next;
269 Node() : next(
nullptr) { }
272 CACHE_ALIGNED(Node*, first);
273 CACHE_ALIGNED(Node*, last);
274 CACHE_ALIGNED(
Spinlock, consumerLock);
275 CACHE_ALIGNED(
Spinlock, producerLock);
280 first = last =
new Node();
284 while( first !=
nullptr ) {
291 void insert(
const T& t) {
292 Node* tmp =
new Node();
294 std::lock_guard<Spinlock> lock(producerLock);
299 bool try_remove( T& result) {
300 std::lock_guard<Spinlock> lock(consumerLock);
301 Node* theFirst = first;
302 Node* theNext = first->next;
303 if( theNext !=
nullptr ) {
304 result = theNext->data;
315 if ( try_remove(res) ) {
318 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
319 defined( __x86_64 ) || defined( __x86_64__ ) )
321 #elif defined(__PPC64__)
322 asm volatile(
"or 27, 27, 27" :::
"memory" );
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:72
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:60
Definition: threadsafe.h:127
Definition: threadsafe.h:264
Definition: threadsafe.h:45
Definition: threadsafe.h:153