SST  7.2.0
StructuralSimulationToolkit
threadsafe.h
1 // Copyright 2009-2017 Sandia Corporation. Under the terms
2 // of Contract DE-NA0003525 with Sandia Corporation, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2014, Sandia Corporation
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
14 
15 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
16  defined( __x86_64 ) || defined( __x86_64__ ) )
17 #include <x86intrin.h>
18 #endif
19 
20 #include <thread>
21 #include <atomic>
22 #include <condition_variable>
23 #include <mutex>
24 
25 #include <vector>
26 //#include <stdalign.h>
27 
28 #include <time.h>
29 
30 #include <sst/core/profile.h>
31 
32 namespace SST {
33 namespace Core {
34 namespace ThreadSafe {
35 
36 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
37 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
38 # define CACHE_ALIGNED_T
39 #else
40 # define CACHE_ALIGNED(type, name) alignas(64) type name
41 # define CACHE_ALIGNED_T alignas(64)
42 #endif
43 
44 
45 class CACHE_ALIGNED_T Barrier {
46  size_t origCount;
47  std::atomic<bool> enabled;
48  std::atomic<size_t> count, generation;
49 public:
50  Barrier(size_t count) : origCount(count), enabled(true),
51  count(count), generation(0)
52  { }
53 
54  // Come g++ 4.7, this can become a delegating constructor
55  Barrier() : origCount(0), enabled(false), count(0), generation(0)
56  { }
57 
58 
59  /** ONLY call this while nobody is in wait() */
60  void resize(size_t newCount)
61  {
62  count = origCount = newCount;
63  generation.store(0);
64  enabled.store(true);
65  }
66 
67 
68  /**
69  * Wait for all threads to reach this point.
70  * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
71  */
72  double wait()
73  {
74  double elapsed = 0.0;
75  if ( enabled ) {
76  auto startTime = SST::Core::Profile::now();
77 
78  size_t gen = generation.load(std::memory_order_acquire);
79  asm("":::"memory");
80  size_t c = count.fetch_sub(1) -1;
81  if ( 0 == c ) {
82  /* We should release */
83  count.store(origCount);
84  asm("":::"memory");
85  /* Incrementing generation causes release */
86  generation.fetch_add(1, std::memory_order_release);
87  __sync_synchronize();
88  } else {
89  /* Try spinning first */
90  uint32_t count = 0;
91  do {
92  count++;
93  if ( count < 1024 ) {
94 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
95  defined( __x86_64 ) || defined( __x86_64__ ) )
96  _mm_pause();
97 #elif defined(__PPC64__)
98  asm volatile( "or 27, 27, 27" ::: "memory" );
99 #endif
100  } else if ( count < (1024*1024) ) {
101  std::this_thread::yield();
102  } else {
103  struct timespec ts;
104  ts.tv_sec = 0;
105  ts.tv_nsec = 1000;
106  nanosleep(&ts, NULL);
107  }
108  } while ( gen == generation.load(std::memory_order_acquire) );
109  }
110  elapsed = SST::Core::Profile::getElapsed(startTime);
111  }
112  return elapsed;
113  }
114 
115  void disable()
116  {
117  enabled.store(false);
118  count.store(0);
119  ++generation;
120  }
121 };
122 
123 
124 #if 0
125 typedef std::mutex Spinlock;
126 #else
127 class Spinlock {
128  std::atomic_flag latch = ATOMIC_FLAG_INIT;
129 public:
130  Spinlock() { }
131 
132  inline void lock() {
133  while ( latch.test_and_set(std::memory_order_acquire) ) {
134 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
135  defined( __x86_64 ) || defined( __x86_64__ ) )
136  _mm_pause();
137 #elif defined(__PPC64__)
138  asm volatile( "or 27, 27, 27" ::: "memory" );
139  __sync_synchronize();
140 #endif
141  }
142  }
143 
144  inline void unlock() {
145  latch.clear(std::memory_order_release);
146  }
147 };
148 #endif
149 
150 
151 
152 template<typename T>
154 
155  struct cell_t {
156  std::atomic<size_t> sequence;
157  T data;
158  };
159 
160  bool initialized;
161  size_t dsize;
162  cell_t *data;
163  CACHE_ALIGNED(std::atomic<size_t>, rPtr);
164  CACHE_ALIGNED(std::atomic<size_t>, wPtr);
165 
166 public:
167  // BoundedQueue(size_t maxSize) : dsize(maxSize)
168  BoundedQueue(size_t maxSize) : initialized(false)
169  {
170  initialize(maxSize);
171  }
172 
173  BoundedQueue() : initialized(false) {}
174 
175  void initialize(size_t maxSize) {
176  if ( initialized ) return;
177  dsize = maxSize;
178  data = new cell_t[dsize];
179  for ( size_t i = 0 ; i < maxSize ; i++ )
180  data[i].sequence.store(i);
181  rPtr.store(0);
182  wPtr.store(0);
183  //fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
184  initialized = true;
185  }
186 
187  ~BoundedQueue()
188  {
189  if ( initialized ) delete [] data;
190  }
191 
192  size_t size() const
193  {
194  return (wPtr.load() - rPtr.load());
195  }
196 
197  bool empty() const
198  {
199  return (rPtr.load() == wPtr.load());
200  }
201 
202  bool try_insert(const T& arg)
203  {
204  cell_t *cell = NULL;
205  size_t pos = wPtr.load(std::memory_order_relaxed);
206  for (;;) {
207  cell = &data[pos % dsize];
208  size_t seq = cell->sequence.load(std::memory_order_acquire);
209  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
210  if ( 0 == diff ) {
211  if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
212  break;
213  } else if ( 0 > diff ) {
214  // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
215  return false;
216  } else {
217  pos = wPtr.load(std::memory_order_relaxed);
218  }
219  }
220  cell->data = arg;
221  cell->sequence.store(pos+1, std::memory_order_release);
222  return true;
223  }
224 
225  bool try_remove(T &res)
226  {
227  cell_t *cell = NULL;
228  size_t pos = rPtr.load(std::memory_order_relaxed);
229  for (;;) {
230  cell = &data[pos % dsize];
231  size_t seq = cell->sequence.load(std::memory_order_acquire);
232  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
233  if ( 0 == diff ) {
234  if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
235  break;
236  } else if ( 0 > diff ) {
237  return false;
238  } else {
239  pos = rPtr.load(std::memory_order_relaxed);
240  }
241  }
242  res = cell->data;
243  cell->sequence.store(pos + dsize, std::memory_order_release);
244  return true;
245  }
246 
247  T remove() {
248  while(1) {
249  T res;
250  if ( try_remove(res) ) {
251  return res;
252  }
253 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
254  defined( __x86_64 ) || defined( __x86_64__ ) )
255  _mm_pause();
256 #elif defined(__PPC64__)
257  asm volatile( "or 27, 27, 27" ::: "memory" );
258 #endif
259  }
260  }
261 };
262 
263 template<typename T>
265  struct CACHE_ALIGNED_T Node {
266  std::atomic<Node*> next;
267  T data;
268 
269  Node() : next(nullptr) { }
270  };
271 
272  CACHE_ALIGNED(Node*, first);
273  CACHE_ALIGNED(Node*, last);
274  CACHE_ALIGNED(Spinlock, consumerLock);
275  CACHE_ALIGNED(Spinlock, producerLock);
276 
277 public:
278  UnboundedQueue() {
279  /* 'first' is a dummy value */
280  first = last = new Node();
281  }
282 
283  ~UnboundedQueue() {
284  while( first != nullptr ) { // release the list
285  Node* tmp = first;
286  first = tmp->next;
287  delete tmp;
288  }
289  }
290 
291  void insert(const T& t) {
292  Node* tmp = new Node();
293  tmp->data = t;
294  std::lock_guard<Spinlock> lock(producerLock);
295  last->next = tmp; // publish to consumers
296  last = tmp; // swing last forward
297  }
298 
299  bool try_remove( T& result) {
300  std::lock_guard<Spinlock> lock(consumerLock);
301  Node* theFirst = first;
302  Node* theNext = first->next;
303  if( theNext != nullptr ) { // if queue is nonempty
304  result = theNext->data; // take it out
305  first = theNext; // swing first forward
306  delete theFirst; // delete the old dummy
307  return true;
308  }
309  return false;
310  }
311 
312  T remove() {
313  while(1) {
314  T res;
315  if ( try_remove(res) ) {
316  return res;
317  }
318 #if ( defined( __amd64 ) || defined( __amd64__ ) || \
319  defined( __x86_64 ) || defined( __x86_64__ ) )
320  _mm_pause();
321 #elif defined(__PPC64__)
322  asm volatile( "or 27, 27, 27" ::: "memory" );
323 #endif
324  }
325  }
326 
327 };
328 
329 }
330 }
331 }
332 
333 #endif
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:72
Definition: action.cc:17
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:60
Definition: threadsafe.h:127
Definition: threadsafe.h:264
Definition: threadsafe.h:45
Definition: threadsafe.h:153