SST  7.1.0
StructuralSimulationToolkit
threadsafe.h
1 // Copyright 2009-2017 Sandia Corporation. Under the terms
2 // of Contract DE-NA0003525 with Sandia Corporation, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2014, Sandia Corporation
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
14 
15 #include <x86intrin.h>
16 
17 #include <thread>
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 
22 #include <vector>
23 //#include <stdalign.h>
24 
25 #include <time.h>
26 
27 #include <sst/core/profile.h>
28 
29 namespace SST {
30 namespace Core {
31 namespace ThreadSafe {
32 
33 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
34 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
35 # define CACHE_ALIGNED_T
36 #else
37 # define CACHE_ALIGNED(type, name) alignas(64) type name
38 # define CACHE_ALIGNED_T alignas(64)
39 #endif
40 
41 
42 class CACHE_ALIGNED_T Barrier {
43  size_t origCount;
44  std::atomic<bool> enabled;
45  std::atomic<size_t> count, generation;
46 public:
47  Barrier(size_t count) : origCount(count), enabled(true),
48  count(count), generation(0)
49  { }
50 
51  // Come g++ 4.7, this can become a delegating constructor
52  Barrier() : origCount(0), enabled(false), count(0), generation(0)
53  { }
54 
55 
56  /** ONLY call this while nobody is in wait() */
57  void resize(size_t newCount)
58  {
59  count = origCount = newCount;
60  generation.store(0);
61  enabled.store(true);
62  }
63 
64 
65  /**
66  * Wait for all threads to reach this point.
67  * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
68  */
69  double wait()
70  {
71  double elapsed = 0.0;
72  if ( enabled ) {
73  auto startTime = SST::Core::Profile::now();
74 
75  size_t gen = generation.load(std::memory_order_acquire);
76  asm("":::"memory");
77  size_t c = count.fetch_sub(1) -1;
78  if ( 0 == c ) {
79  /* We should release */
80  count.store(origCount);
81  asm("":::"memory");
82  /* Incrementing generation causes release */
83  generation.fetch_add(1, std::memory_order_release);
84  __sync_synchronize();
85  } else {
86  /* Try spinning first */
87  uint32_t count = 0;
88  do {
89  count++;
90  if ( count < 1024 )
91  _mm_pause();
92  else if ( count < (1024*1024) )
93  std::this_thread::yield();
94  else {
95  struct timespec ts;
96  ts.tv_sec = 0;
97  ts.tv_nsec = 1000;
98  nanosleep(&ts, NULL);
99  }
100  } while ( gen == generation.load(std::memory_order_acquire) );
101  }
102  elapsed = SST::Core::Profile::getElapsed(startTime);
103  }
104  return elapsed;
105  }
106 
107  void disable()
108  {
109  enabled.store(false);
110  count.store(0);
111  ++generation;
112  }
113 };
114 
115 
116 class Spinlock {
117  std::atomic<int> latch;
118 public:
119  Spinlock() : latch(0)
120  { }
121 
122  inline void lock() {
123  //while ( latch.exchange(1, std::memory_order_acquire) ) {
124  int zero = 0;
125  while ( !latch.compare_exchange_weak(zero, 1,
126  std::memory_order_acquire,
127  std::memory_order_relaxed) && zero) {
128  do {
129  zero = 0;
130  _mm_pause();
131  } while ( latch.load(std::memory_order_acquire) );
132  }
133  }
134 
135  inline void unlock() {
136  /* TODO: Understand why latch.store(0) breaks */
137  --latch;
138  }
139 };
140 
141 
142 
143 template<typename T>
145 
146  struct cell_t {
147  std::atomic<size_t> sequence;
148  T data;
149  };
150 
151  bool initialized;
152  size_t dsize;
153  cell_t *data;
154  CACHE_ALIGNED(std::atomic<size_t>, rPtr);
155  CACHE_ALIGNED(std::atomic<size_t>, wPtr);
156 
157 public:
158  // BoundedQueue(size_t maxSize) : dsize(maxSize)
159  BoundedQueue(size_t maxSize) : initialized(false)
160  {
161  initialize(maxSize);
162  }
163 
164  BoundedQueue() : initialized(false) {}
165 
166  void initialize(size_t maxSize) {
167  if ( initialized ) return;
168  dsize = maxSize;
169  data = new cell_t[dsize];
170  for ( size_t i = 0 ; i < maxSize ; i++ )
171  data[i].sequence.store(i);
172  rPtr.store(0);
173  wPtr.store(0);
174  //fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
175  initialized = true;
176  }
177 
178  ~BoundedQueue()
179  {
180  if ( initialized ) delete [] data;
181  }
182 
183  size_t size() const
184  {
185  return (wPtr.load() - rPtr.load());
186  }
187 
188  bool empty() const
189  {
190  return (rPtr.load() == wPtr.load());
191  }
192 
193  bool try_insert(const T& arg)
194  {
195  cell_t *cell = NULL;
196  size_t pos = wPtr.load(std::memory_order_relaxed);
197  for (;;) {
198  cell = &data[pos % dsize];
199  size_t seq = cell->sequence.load(std::memory_order_acquire);
200  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
201  if ( 0 == diff ) {
202  if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
203  break;
204  } else if ( 0 > diff ) {
205  // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
206  return false;
207  } else {
208  pos = wPtr.load(std::memory_order_relaxed);
209  }
210  }
211  cell->data = arg;
212  cell->sequence.store(pos+1, std::memory_order_release);
213  return true;
214  }
215 
216  bool try_remove(T &res)
217  {
218  cell_t *cell = NULL;
219  size_t pos = rPtr.load(std::memory_order_relaxed);
220  for (;;) {
221  cell = &data[pos % dsize];
222  size_t seq = cell->sequence.load(std::memory_order_acquire);
223  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
224  if ( 0 == diff ) {
225  if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
226  break;
227  } else if ( 0 > diff ) {
228  return false;
229  } else {
230  pos = rPtr.load(std::memory_order_relaxed);
231  }
232  }
233  res = cell->data;
234  cell->sequence.store(pos + dsize, std::memory_order_release);
235  return true;
236  }
237 
238  T remove() {
239  while(1) {
240  T res;
241  if ( try_remove(res) ) {
242  return res;
243  }
244  _mm_pause();
245  }
246  }
247 };
248 
249 template<typename T>
251  struct CACHE_ALIGNED_T Node {
252  std::atomic<Node*> next;
253  T data;
254 
255  Node() : next(nullptr) { }
256  };
257 
258  CACHE_ALIGNED(Node*, first);
259  CACHE_ALIGNED(Node*, last);
260  CACHE_ALIGNED(Spinlock, consumerLock);
261  CACHE_ALIGNED(Spinlock, producerLock);
262 
263 public:
264  UnboundedQueue() {
265  /* 'first' is a dummy value */
266  first = last = new Node();
267  }
268 
269  ~UnboundedQueue() {
270  while( first != nullptr ) { // release the list
271  Node* tmp = first;
272  first = tmp->next;
273  delete tmp;
274  }
275  }
276 
277  void insert(const T& t) {
278  Node* tmp = new Node();
279  tmp->data = t;
280  std::lock_guard<Spinlock> lock(producerLock);
281  last->next = tmp; // publish to consumers
282  last = tmp; // swing last forward
283  }
284 
285  bool try_remove( T& result) {
286  std::lock_guard<Spinlock> lock(consumerLock);
287  Node* theFirst = first;
288  Node* theNext = first->next;
289  if( theNext != nullptr ) { // if queue is nonempty
290  result = theNext->data; // take it out
291  first = theNext; // swing first forward
292  delete theFirst; // delete the old dummy
293  return true;
294  }
295  return false;
296  }
297 
298  T remove() {
299  while(1) {
300  T res;
301  if ( try_remove(res) ) {
302  return res;
303  }
304  _mm_pause();
305  }
306  }
307 
308 };
309 
310 }
311 }
312 }
313 
314 #endif
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:69
Definition: action.cc:17
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:57
Definition: threadsafe.h:116
Definition: threadsafe.h:250
Definition: threadsafe.h:42
Definition: threadsafe.h:144