SST  6.0.0
StructuralSimulationToolkit
threadsafe.h
1 // Copyright 2009-2016 Sandia Corporation. Under the terms
2 // of Contract DE-AC04-94AL85000 with Sandia Corporation, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2014, Sandia Corporation
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_CORE_THREADSAFE_H
13 #define SST_CORE_CORE_THREADSAFE_H
14 
15 #include <x86intrin.h>
16 
17 #include <thread>
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 
22 #include <vector>
23 //#include <stdalign.h>
24 
25 #include <sched.h>
26 
27 #include <sst/core/profile.h>
28 
29 namespace SST {
30 namespace Core {
31 namespace ThreadSafe {
32 
33 #if defined(__GNUC__) && ((__GNUC__ == 4 && __GNUC_MINOR__ < 8 ))
34 # define CACHE_ALIGNED(type, name) type name __attribute__((aligned(64)))
35 # define CACHE_ALIGNED_T
36 #else
37 # define CACHE_ALIGNED(type, name) alignas(64) type name
38 # define CACHE_ALIGNED_T alignas(64)
39 #endif
40 
41 
42 class Barrier {
43  size_t origCount;
44  std::atomic<bool> enabled;
45  std::atomic<size_t> count, generation;
46 public:
47  Barrier(size_t count) : origCount(count), enabled(true),
48  count(count), generation(0)
49  { }
50 
51  // Come g++ 4.7, this can become a delegating constructor
52  Barrier() : origCount(0), enabled(false), count(0), generation(0)
53  { }
54 
55 
56  /** ONLY call this while nobody is in wait() */
57  void resize(size_t newCount)
58  {
59  count = origCount = newCount;
60  generation.store(0);
61  enabled.store(true);
62  }
63 
64 
65  /**
66  * Wait for all threads to reach this point.
67  * @return 0.0, or elapsed time spent waiting, if configured with --enable-profile
68  */
69  double wait()
70  {
71  double elapsed = 0.0;
72  if ( enabled ) {
73  auto startTime = SST::Core::Profile::now();
74 
75  size_t gen = generation.load();
76  size_t c = --count;
77  if ( 0 == c ) {
78  /* We should release */
79  count = origCount;
80  ++generation; /* Incrementing generation causes release */
81  } else {
82  /* Try spinning first */
83  do {
84  _mm_pause();
85  } while ( gen == generation.load(std::memory_order_acquire) );
86  }
87  elapsed = SST::Core::Profile::getElapsed(startTime);
88  }
89  return elapsed;
90  }
91 
92  void disable()
93  {
94  enabled.store(false);
95  count.store(0);
96  ++generation;
97  }
98 };
99 
100 
101 class Spinlock {
102  std::atomic<int> latch;
103 public:
104  Spinlock() : latch(0)
105  { }
106 
107  inline void lock() {
108  //while ( latch.exchange(1, std::memory_order_acquire) ) {
109  int zero = 0;
110  while ( !latch.compare_exchange_weak(zero, 1,
111  std::memory_order_acquire,
112  std::memory_order_relaxed) && zero) {
113  do {
114  zero = 0;
115  _mm_pause();
116  } while ( latch.load(std::memory_order_acquire) );
117  }
118  }
119 
120  inline void unlock() {
121  /* TODO: Understand why latch.store(0) breaks */
122  --latch;
123  }
124 };
125 
126 
127 
128 template<typename T>
130 
131  struct cell_t {
132  std::atomic<size_t> sequence;
133  T data;
134  };
135 
136  bool initialized;
137  size_t dsize;
138  cell_t *data;
139  CACHE_ALIGNED(std::atomic<size_t>, rPtr);
140  CACHE_ALIGNED(std::atomic<size_t>, wPtr);
141 
142 public:
143  // BoundedQueue(size_t maxSize) : dsize(maxSize)
144  BoundedQueue(size_t maxSize) : initialized(false)
145  {
146  initialize(maxSize);
147  }
148 
149  BoundedQueue() : initialized(false) {}
150 
151  void initialize(size_t maxSize) {
152  if ( initialized ) return;
153  dsize = maxSize;
154  data = new cell_t[dsize];
155  for ( size_t i = 0 ; i < maxSize ; i++ )
156  data[i].sequence.store(i);
157  rPtr.store(0);
158  wPtr.store(0);
159  //fprintf(stderr, "%p %p: %ld\n", &rPtr, &wPtr, ((intptr_t)&wPtr - (intptr_t)&rPtr));
160  initialized = true;
161  }
162 
163  ~BoundedQueue()
164  {
165  if ( initialized ) delete [] data;
166  }
167 
168  size_t size() const
169  {
170  return (wPtr.load() - rPtr.load());
171  }
172 
173  bool empty() const
174  {
175  return (rPtr.load() == wPtr.load());
176  }
177 
178  bool try_insert(const T& arg)
179  {
180  cell_t *cell = NULL;
181  size_t pos = wPtr.load(std::memory_order_relaxed);
182  for (;;) {
183  cell = &data[pos % dsize];
184  size_t seq = cell->sequence.load(std::memory_order_acquire);
185  intptr_t diff = (intptr_t)seq - (intptr_t)pos;
186  if ( 0 == diff ) {
187  if ( wPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
188  break;
189  } else if ( 0 > diff ) {
190  // fprintf(stderr, "diff = %ld: %zu - %zu\n", diff, seq, pos);
191  return false;
192  } else {
193  pos = wPtr.load(std::memory_order_relaxed);
194  }
195  }
196  cell->data = arg;
197  cell->sequence.store(pos+1, std::memory_order_release);
198  return true;
199  }
200 
201  bool try_remove(T &res)
202  {
203  cell_t *cell = NULL;
204  size_t pos = rPtr.load(std::memory_order_relaxed);
205  for (;;) {
206  cell = &data[pos % dsize];
207  size_t seq = cell->sequence.load(std::memory_order_acquire);
208  intptr_t diff = (intptr_t)seq - (intptr_t)(pos + 1);
209  if ( 0 == diff ) {
210  if ( rPtr.compare_exchange_weak(pos, pos+1, std::memory_order_relaxed) )
211  break;
212  } else if ( 0 > diff ) {
213  return false;
214  } else {
215  pos = rPtr.load(std::memory_order_relaxed);
216  }
217  }
218  res = cell->data;
219  cell->sequence.store(pos + dsize, std::memory_order_release);
220  return true;
221  }
222 
223  T remove() {
224  while(1) {
225  T res;
226  if ( try_remove(res) ) {
227  return res;
228  }
229  _mm_pause();
230  }
231  }
232 };
233 
234 template<typename T>
236  struct CACHE_ALIGNED_T Node {
237  std::atomic<Node*> next;
238  T data;
239 
240  Node() : next(nullptr) { }
241  };
242 
243  CACHE_ALIGNED(Node*, first);
244  CACHE_ALIGNED(Node*, last);
245  CACHE_ALIGNED(Spinlock, consumerLock);
246  CACHE_ALIGNED(Spinlock, producerLock);
247 
248 public:
249  UnboundedQueue() {
250  /* 'first' is a dummy value */
251  first = last = new Node();
252  }
253 
254  ~UnboundedQueue() {
255  while( first != nullptr ) { // release the list
256  Node* tmp = first;
257  first = tmp->next;
258  delete tmp;
259  }
260  }
261 
262  void insert(const T& t) {
263  Node* tmp = new Node();
264  tmp->data = t;
265  std::lock_guard<Spinlock> lock(producerLock);
266  last->next = tmp; // publish to consumers
267  last = tmp; // swing last forward
268  }
269 
270  bool try_remove( T& result) {
271  std::lock_guard<Spinlock> lock(consumerLock);
272  Node* theFirst = first;
273  Node* theNext = first->next;
274  if( theNext != nullptr ) { // if queue is nonempty
275  result = theNext->data; // take it out
276  first = theNext; // swing first forward
277  delete theFirst; // delete the old dummy
278  return true;
279  }
280  return false;
281  }
282 
283  T remove() {
284  while(1) {
285  T res;
286  if ( try_remove(res) ) {
287  return res;
288  }
289  _mm_pause();
290  }
291  }
292 
293 };
294 
295 }
296 }
297 }
298 
299 #endif
double wait()
Wait for all threads to reach this point.
Definition: threadsafe.h:69
Definition: action.cc:17
void resize(size_t newCount)
ONLY call this while nobody is in wait()
Definition: threadsafe.h:57
Definition: threadsafe.h:101
Definition: threadsafe.h:235
Definition: threadsafe.h:42
Definition: threadsafe.h:129