12 #ifndef SST_CORE_CORE_THREADSAFE_H 
   13 #define SST_CORE_CORE_THREADSAFE_H 
   15 #if ( defined( __amd64 ) || defined( __amd64__ ) || \ 
   16         defined( __x86_64 ) || defined( __x86_64__ ) ) 
   17 #include <x86intrin.h> 
   22 #include <condition_variable> 
   30 #include <sst/core/profile.h> 
   34 namespace ThreadSafe {
 
   36 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 )) 
   37 #    define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64))) 
   38 #    define CACHE_ALIGNED_T 
   40 #    define CACHE_ALIGNED(type, name) alignas(64) type name 
   41 #    define CACHE_ALIGNED_T alignas(64) 
   47     std::atomic<bool> enabled;
 
   48     std::atomic<size_t> count, generation;
 
   50     Barrier(
size_t count) : origCount(count), enabled(
true),
 
   51             count(count), generation(0)
 
   55     Barrier() : origCount(0), enabled(
false), count(0), generation(0)
 
   62         count = origCount = newCount;
 
   76             auto startTime = SST::Core::Profile::now();
 
   78             size_t gen = generation.load(std::memory_order_acquire);
 
   80             size_t c = count.fetch_sub(1) -1;
 
   83                 count.store(origCount);
 
   86                 generation.fetch_add(1, std::memory_order_release);
 
   94 #if ( defined( __amd64 ) || defined( __amd64__ ) || \ 
   95         defined( __x86_64 ) || defined( __x86_64__ ) ) 
   97 #elif defined(__PPC64__) 
   98         asm volatile( 
"or 27, 27, 27" ::: 
"memory" );
 
  100                     } 
else if ( count < (1024*1024) ) {
 
  101                         std::this_thread::yield();
 
  106                         nanosleep(&ts, NULL);
 
  108                 } 
while ( gen == generation.load(std::memory_order_acquire) );
 
  110             elapsed = SST::Core::Profile::getElapsed(startTime);
 
  117         enabled.store(
false);
 
  128     std::atomic_flag latch = ATOMIC_FLAG_INIT;
 
  133         while ( latch.test_and_set(std::memory_order_acquire) ) {
 
  134 #if ( defined( __amd64 ) || defined( __amd64__ ) || \ 
  135         defined( __x86_64 ) || defined( __x86_64__ ) ) 
  137 #elif defined(__PPC64__) 
  138                 asm volatile( 
"or 27, 27, 27" ::: 
"memory" );
 
  139                 __sync_synchronize();
 
  144     inline void unlock() {
 
  145         latch.clear(std::memory_order_release);
 
  156         std::atomic<size_t> sequence;
 
  163     CACHE_ALIGNED(std::atomic<size_t>, rPtr);
 
  164     CACHE_ALIGNED(std::atomic<size_t>, wPtr);
 
  175     void initialize(
size_t maxSize) {
 
  176         if ( initialized ) 
return;
 
  178         data = 
new cell_t[dsize];
 
  179         for ( 
size_t i = 0 ; i < maxSize ; i++ )
 
  180             data[i].sequence.store(i);
 
  189         if ( initialized ) 
delete [] data;
 
  194         return (wPtr.load() - rPtr.load());
 
  199         return (rPtr.load() == wPtr.load());
 
  202     bool try_insert(
const T& arg)
 
  205         size_t pos = wPtr.load(std::memory_order_relaxed);
 
  207             cell = &data[pos % dsize];
 
  208             size_t seq = cell->sequence.load(std::memory_order_acquire);
 
  209             intptr_t diff = (intptr_t)seq - (intptr_t)pos;
 
  211                 if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
 
  213             } 
else if ( 0 > diff ) {
 
  217                 pos = wPtr.load(std::memory_order_relaxed);
 
  221         cell->sequence.store(pos+1, std::memory_order_release);
 
  225     bool try_remove(T &res)
 
  228         size_t pos = rPtr.load(std::memory_order_relaxed);
 
  230             cell = &data[pos % dsize];
 
  231             size_t seq = cell->sequence.load(std::memory_order_acquire);
 
  232             intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
 
  234                 if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
 
  236             } 
else if ( 0 > diff ) {
 
  239                 pos = rPtr.load(std::memory_order_relaxed);
 
  243         cell->sequence.store(pos + dsize, std::memory_order_release);
 
  250             if ( try_remove(res) ) {
 
  253 #if ( defined( __amd64 ) || defined( __amd64__ ) || \ 
  254         defined( __x86_64 ) || defined( __x86_64__ ) ) 
  256 #elif defined(__PPC64__) 
  257         asm volatile( 
"or 27, 27, 27" ::: 
"memory" );
 
  265     struct CACHE_ALIGNED_T Node {
 
  266         std::atomic<Node*> next;
 
  269         Node() : next(
nullptr) { }
 
  272     CACHE_ALIGNED(Node*, first);
 
  273     CACHE_ALIGNED(Node*, last);
 
  274     CACHE_ALIGNED(
Spinlock, consumerLock);
 
  275     CACHE_ALIGNED(
Spinlock, producerLock);
 
  280         first = last = 
new Node();
 
  284         while( first != 
nullptr ) {      
 
  291     void insert(
const T& t) {
 
  292         Node* tmp = 
new Node();
 
  294         std::lock_guard<Spinlock> lock(producerLock);
 
  299     bool try_remove( T& result) {
 
  300         std::lock_guard<Spinlock> lock(consumerLock);
 
  301         Node* theFirst = first;
 
  302         Node* theNext = first->next;
 
  303         if( theNext != 
nullptr ) {     
 
  304             result = theNext->data;    
 
  315             if ( try_remove(res) ) {
 
  318 #if ( defined( __amd64 ) || defined( __amd64__ ) || \ 
  319         defined( __x86_64 ) || defined( __x86_64__ ) ) 
  321 #elif defined(__PPC64__) 
  322         asm volatile( 
"or 27, 27, 27" ::: 
"memory" );
 
double wait()
Wait for all threads to reach this point. 
Definition: threadsafe.h:72
 
void resize(size_t newCount)
ONLY call this while nobody is in wait() 
Definition: threadsafe.h:60
 
Definition: threadsafe.h:127
 
Definition: threadsafe.h:264
 
Definition: threadsafe.h:45
 
Definition: threadsafe.h:153