12 #ifndef SST_CORE_THREADSAFE_H 13 #define SST_CORE_THREADSAFE_H 15 #if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) ) 16 #include <x86intrin.h> 17 #define sst_pause() _mm_pause() 18 #elif (defined(__arm__) || defined(__arm) || defined(__aarch64__)) 19 #define sst_pause() __asm__ __volatile__("yield") 20 #elif defined(__PPC64__) 21 #define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory"); 25 #include <condition_variable> 31 #include "sst/core/profile.h" 37 namespace ThreadSafe {
39 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8)) 40 #define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64))) 41 #define CACHE_ALIGNED_T 43 #define CACHE_ALIGNED(type, name) alignas(64) type name 44 #define CACHE_ALIGNED_T alignas(64) 50 std::atomic<bool> enabled;
51 std::atomic<size_t> count, generation;
54 Barrier(
size_t count) : origCount(count), enabled(
true), count(count), generation(0) {}
57 Barrier() : origCount(0), enabled(
false), count(0), generation(0) {}
62 count = origCount = newCount;
75 auto startTime = SST::Core::Profile::now();
77 size_t gen = generation.load(std::memory_order_acquire);
79 size_t c = count.fetch_sub(1) - 1;
82 count.store(origCount);
85 generation.fetch_add(1, std::memory_order_release);
93 if ( count < 1024 ) { sst_pause(); }
94 else if ( count < (1024 * 1024) ) {
95 std::this_thread::yield();
101 nanosleep(&ts,
nullptr);
103 }
while ( gen == generation.load(std::memory_order_acquire) );
105 elapsed = SST::Core::Profile::getElapsed(startTime);
112 enabled.store(
false);
123 std::atomic_flag latch = ATOMIC_FLAG_INIT;
130 while ( latch.test_and_set(std::memory_order_acquire) ) {
132 #if defined(__PPC64__) 133 __sync_synchronize();
138 inline void unlock() { latch.clear(std::memory_order_release); }
146 inline void lock() {}
148 inline void unlock() {}
151 template <
typename T>
157 std::atomic<size_t> sequence;
164 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
165 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
169 BoundedQueue(
size_t maxSize) : initialized(
false) { initialize(maxSize); }
173 void initialize(
size_t maxSize)
175 if ( initialized )
return;
177 data =
new cell_t[dsize];
178 for (
size_t i = 0; i < maxSize; i++ )
179 data[i].sequence.store(i);
188 if ( initialized )
delete[] data;
191 size_t size()
const {
return (wPtr.load() - rPtr.load()); }
193 bool empty()
const {
return (rPtr.load() == wPtr.load()); }
195 bool try_insert(
const T& arg)
197 cell_t* cell =
nullptr;
198 size_t pos = wPtr.load(std::memory_order_relaxed);
200 cell = &data[pos % dsize];
201 size_t seq = cell->sequence.load(std::memory_order_acquire);
202 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
204 if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) )
break;
206 else if ( 0 > diff ) {
211 pos = wPtr.load(std::memory_order_relaxed);
215 cell->sequence.store(pos + 1, std::memory_order_release);
219 bool try_remove(T& res)
221 cell_t* cell =
nullptr;
222 size_t pos = rPtr.load(std::memory_order_relaxed);
224 cell = &data[pos % dsize];
225 size_t seq = cell->sequence.load(std::memory_order_acquire);
226 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
228 if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) )
break;
230 else if ( 0 > diff ) {
234 pos = rPtr.load(std::memory_order_relaxed);
238 cell->sequence.store(pos + dsize, std::memory_order_release);
246 if ( try_remove(res) ) {
return res; }
252 template <
typename T>
255 struct CACHE_ALIGNED_T Node
257 std::atomic<Node*> next;
260 Node() : next(
nullptr) {}
263 CACHE_ALIGNED(Node*, first);
264 CACHE_ALIGNED(Node*, last);
265 CACHE_ALIGNED(
Spinlock, consumerLock);
266 CACHE_ALIGNED(
Spinlock, producerLock);
272 first = last =
new Node();
277 while ( first !=
nullptr ) {
284 void insert(
const T& t)
286 Node* tmp =
new Node();
288 std::lock_guard<Spinlock> lock(producerLock);
293 bool try_remove(T& result)
295 std::lock_guard<Spinlock> lock(consumerLock);
296 Node* theFirst = first;
297 Node* theNext = first->next;
298 if ( theNext !=
nullptr ) {
299 result = theNext->data;
311 if ( try_remove(res) ) {
return res; }
318 template <
typename T>
320 atomic_fetch_max(std::atomic<T>& max_value, T
const& new_value) noexcept
322 T old_value = max_value;
323 while ( old_value < new_value && !max_value.compare_exchange_weak(old_value, new_value) ) {}
327 template <
typename T>
329 atomic_fetch_min(std::atomic<T>& min_value, T
const& new_value) noexcept
331 T old_value = min_value;
332 while ( old_value > new_value && !min_value.compare_exchange_weak(old_value, new_value) ) {}
339 #endif // SST_CORE_THREADSAFE_H double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:71
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:60
Definition: threadsafe.h:121
Definition: threadsafe.h:253
Definition: threadsafe.h:47
Definition: threadsafe.h:142
Definition: threadsafe.h:152