12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
15 #include <x86intrin.h>
19 #include <condition_variable>
27 #include <sst/core/profile.h>
31 namespace ThreadSafe {
33 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
34 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
35 # define CACHE_ALIGNED_T
37 # define CACHE_ALIGNED(type, name) alignas(64) type name
38 # define CACHE_ALIGNED_T alignas(64)
44 std::atomic<bool> enabled;
45 std::atomic<size_t> count, generation;
47 Barrier(
size_t count) : origCount(count), enabled(
true),
48 count(count), generation(0)
52 Barrier() : origCount(0), enabled(
false), count(0), generation(0)
59 count = origCount = newCount;
73 auto startTime = SST::Core::Profile::now();
75 size_t gen = generation.load();
85 }
while ( gen == generation.load(std::memory_order_acquire) );
87 elapsed = SST::Core::Profile::getElapsed(startTime);
102 std::atomic<int> latch;
110 while ( !latch.compare_exchange_weak(zero, 1,
111 std::memory_order_acquire,
112 std::memory_order_relaxed) && zero) {
116 }
while ( latch.load(std::memory_order_acquire) );
120 inline void unlock() {
132 std::atomic<size_t> sequence;
139 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
140 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
151 void initialize(
size_t maxSize) {
152 if ( initialized )
return;
154 data =
new cell_t[dsize];
155 for (
size_t i = 0 ; i < maxSize ; i++ )
156 data[i].sequence.store(i);
165 if ( initialized )
delete [] data;
170 return (wPtr.load() - rPtr.load());
175 return (rPtr.load() == wPtr.load());
178 bool try_insert(
const T& arg)
181 size_t pos = wPtr.load(std::memory_order_relaxed);
183 cell = &data[pos % dsize];
184 size_t seq = cell->sequence.load(std::memory_order_acquire);
185 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
187 if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
189 }
else if ( 0 > diff ) {
193 pos = wPtr.load(std::memory_order_relaxed);
197 cell->sequence.store(pos+1, std::memory_order_release);
201 bool try_remove(T &res)
204 size_t pos = rPtr.load(std::memory_order_relaxed);
206 cell = &data[pos % dsize];
207 size_t seq = cell->sequence.load(std::memory_order_acquire);
208 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
210 if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
212 }
else if ( 0 > diff ) {
215 pos = rPtr.load(std::memory_order_relaxed);
219 cell->sequence.store(pos + dsize, std::memory_order_release);
226 if ( try_remove(res) ) {
236 struct CACHE_ALIGNED_T Node {
237 std::atomic<Node*> next;
240 Node() : next(
nullptr) { }
243 CACHE_ALIGNED(Node*, first);
244 CACHE_ALIGNED(Node*, last);
245 CACHE_ALIGNED(
Spinlock, consumerLock);
246 CACHE_ALIGNED(
Spinlock, producerLock);
251 first = last =
new Node();
255 while( first !=
nullptr ) {
262 void insert(
const T& t) {
263 Node* tmp =
new Node();
265 std::lock_guard<Spinlock> lock(producerLock);
270 bool try_remove( T& result) {
271 std::lock_guard<Spinlock> lock(consumerLock);
272 Node* theFirst = first;
273 Node* theNext = first->next;
274 if( theNext !=
nullptr ) {
275 result = theNext->data;
286 if ( try_remove(res) ) {
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:69
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:57
Definition: threadsafe.h:101
Definition: threadsafe.h:235
Definition: threadsafe.h:42
Definition: threadsafe.h:129