12 #ifndef SST_CORE_THREADSAFE_H 13 #define SST_CORE_THREADSAFE_H 15 #if ( defined(__amd64) || defined(__amd64__) || defined(__x86_64) || defined(__x86_64__) ) 16 #include <x86intrin.h> 17 #define sst_pause() _mm_pause() 18 #elif (defined(__arm__) || defined(__arm) || defined(__aarch64__)) 19 #define sst_pause() __asm__ __volatile__("yield") 20 #elif defined(__PPC64__) 21 #define sst_pause() __asm__ __volatile__("or 27, 27, 27" ::: "memory"); 23 #define sst_pause() std::this_thread::yield() 27 #include <condition_variable> 35 #include "sst/core/profile.h" 41 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8)) 42 #define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64))) 43 #define CACHE_ALIGNED_T 45 #define CACHE_ALIGNED(type, name) alignas(64) type name 46 #define CACHE_ALIGNED_T alignas(64) 52 std::atomic<bool> enabled;
53 std::atomic<size_t> count, generation;
74 count = origCount = newCount;
87 auto startTime = SST::Core::Profile::now();
89 size_t gen = generation.load(std::memory_order_acquire);
91 size_t c = count.fetch_sub(1) - 1;
94 count.store(origCount);
97 generation.fetch_add(1, std::memory_order_release);
105 if ( count < 1024 ) {
108 else if ( count < (1024 * 1024) ) {
109 std::this_thread::yield();
115 nanosleep(&ts,
nullptr);
117 }
while ( gen == generation.load(std::memory_order_acquire) );
119 elapsed = SST::Core::Profile::getElapsed(startTime);
126 enabled.store(
false);
137 std::atomic_flag latch = ATOMIC_FLAG_INIT;
144 while ( latch.test_and_set(std::memory_order_acquire) ) {
146 #if defined(__PPC64__) 147 __sync_synchronize();
152 inline void unlock() { latch.clear(std::memory_order_release); }
160 inline void lock() {}
162 inline void unlock() {}
165 template <
typename T>
171 std::atomic<size_t> sequence;
178 CACHE_ALIGNED(std::atomic<size_t>, rPtr);
179 CACHE_ALIGNED(std::atomic<size_t>, wPtr);
193 void initialize(
size_t maxSize)
195 if ( initialized )
return;
197 data =
new cell_t[dsize];
198 for (
size_t i = 0; i < maxSize; i++ )
199 data[i].sequence.store(i);
208 if ( initialized )
delete[] data;
211 size_t size()
const {
return (wPtr.load() - rPtr.load()); }
213 bool empty()
const {
return (rPtr.load() == wPtr.load()); }
215 bool try_insert(
const T& arg)
217 cell_t* cell =
nullptr;
218 size_t pos = wPtr.load(std::memory_order_relaxed);
220 cell = &data[pos % dsize];
221 size_t seq = cell->sequence.load(std::memory_order_acquire);
222 intptr_t diff = (intptr_t)seq - (intptr_t)pos;
224 if ( wPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) )
break;
226 else if ( 0 > diff ) {
231 pos = wPtr.load(std::memory_order_relaxed);
235 cell->sequence.store(pos + 1, std::memory_order_release);
239 bool try_remove(T& res)
241 cell_t* cell =
nullptr;
242 size_t pos = rPtr.load(std::memory_order_relaxed);
244 cell = &data[pos % dsize];
245 size_t seq = cell->sequence.load(std::memory_order_acquire);
246 intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
248 if ( rPtr.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed) )
break;
250 else if ( 0 > diff ) {
254 pos = rPtr.load(std::memory_order_relaxed);
258 cell->sequence.store(pos + dsize, std::memory_order_release);
266 if ( try_remove(res) ) {
274 template <
typename T>
277 struct CACHE_ALIGNED_T Node
279 std::atomic<Node*> next;
287 CACHE_ALIGNED(Node*, first);
288 CACHE_ALIGNED(Node*, last);
289 CACHE_ALIGNED(
Spinlock, consumerLock);
290 CACHE_ALIGNED(
Spinlock, producerLock);
296 first = last =
new Node();
301 while ( first !=
nullptr ) {
308 void insert(
const T& t)
310 Node* tmp =
new Node();
312 std::lock_guard<Spinlock> lock(producerLock);
317 bool try_remove(T& result)
319 std::lock_guard<Spinlock> lock(consumerLock);
320 Node* theFirst = first;
321 Node* theNext = first->next;
322 if ( theNext !=
nullptr ) {
323 result = theNext->data;
335 if ( try_remove(res) ) {
344 template <
typename T>
346 atomic_fetch_max(std::atomic<T>& max_value, T
const& new_value) noexcept
348 T old_value = max_value;
349 while ( old_value < new_value && !max_value.compare_exchange_weak(old_value, new_value) ) {}
353 template <
typename T>
355 atomic_fetch_min(std::atomic<T>& min_value, T
const& new_value) noexcept
357 T old_value = min_value;
358 while ( old_value > new_value && !min_value.compare_exchange_weak(old_value, new_value) ) {}
363 #endif // SST_CORE_THREADSAFE_H double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:83
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:72
Definition: threadsafe.h:135
Definition: threadsafe.h:275
Definition: threadsafe.h:39
Definition: threadsafe.h:49
Definition: threadsafe.h:156
Definition: threadsafe.h:166