SST  6.1.0
StructuralSimulationToolkit
circularBuffer.h
1 // Copyright 2009-2016 Sandia Corporation. Under the terms
2 // of Contract DE-AC04-94AL85000 with Sandia Corporation, the U.S.
3 // Government retains certain rights in this software.
4 //
5 // Copyright (c) 2009-2016, Sandia Corporation
6 // All rights reserved.
7 //
8 // This file is part of the SST software package. For license
9 // information, see the LICENSE file in the top level directory of the
10 // distribution.
11 
12 #ifndef SST_CORE_INTERPROCESS_CIRCULARBUFFER_H
13 #define SST_CORE_INTERPROCESS_CIRCULARBUFFER_H 1
14 
15 #include <cstddef>
16 #include <string.h>
17 #include <errno.h>
18 #include <pthread.h>
19 
20 namespace SST {
21 namespace Core {
22 namespace Interprocess {
23 
24 /**
25  * Multi-process safe, Circular Buffer class
26  *
27  * @tparam T Type of data item to store in the buffer
28  * @tparam A Memory Allocator type to use
29  */
30 template <typename T>
32 
33  pthread_mutex_t mtx;
34  pthread_cond_t cond_full, cond_empty;
35  pthread_condattr_t attrcond;
36  pthread_mutexattr_t attrmutex;
37 
38  size_t rPtr, wPtr;
39  size_t buffSize;
40  T buffer[0]; // Actual size: buffSize
41 
42 
43 public:
44  /**
45  * Construct a new circular buffer
46  * @param bufferSize Number of elements in the buffer
47  * @param allocator Memory allocator to use for constructing the buffer
48  */
49  CircularBuffer(size_t bufferSize = 0) :
50  rPtr(0), wPtr(0), buffSize(bufferSize)
51  {
52  pthread_mutexattr_init(&attrmutex);
53  pthread_mutexattr_setpshared(&attrmutex, PTHREAD_PROCESS_SHARED);
54  if ( pthread_mutex_init(&mtx, &attrmutex) ) {
55  fprintf(stderr, "Failed to initialie mutex: %s\n", strerror(errno));
56  exit(1);
57  }
58 
59  pthread_condattr_init(&attrcond);
60  pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
61  if ( pthread_cond_init(&cond_full, &attrcond) ) {
62  fprintf(stderr, "Failed to initialie condition vars: %s\n", strerror(errno));
63  exit(1);
64  }
65  if ( pthread_cond_init(&cond_empty, &attrcond) ) {
66  fprintf(stderr, "Failed to initialie condition vars: %s\n", strerror(errno));
67  exit(1);
68  }
69  }
70 
71  ~CircularBuffer() {
72  pthread_mutex_destroy(&mtx);
73  pthread_mutexattr_destroy(&attrmutex);
74 
75  pthread_cond_destroy(&cond_full);
76  pthread_cond_destroy(&cond_empty);
77  pthread_condattr_destroy(&attrcond);
78  }
79 
80  void setBufferSize(size_t bufferSize)
81  {
82  if ( buffSize != 0 ) {
83  fprintf(stderr, "Already specified size for buffer\n");
84  exit(1);
85  }
86  buffSize = bufferSize;
87  }
88 
89  /**
90  * Write a value to the circular buffer
91  * @param value New Value to write
92  */
93  void write(const T &value)
94  {
95  if ( pthread_mutex_lock(&mtx) ) {
96  fprintf(stderr, "LOCKING ERROR: %s\n", strerror(errno));
97  }
98  while ( (wPtr+1) % buffSize == rPtr ) {
99  pthread_cond_wait(&cond_full, &mtx);
100  }
101 
102  buffer[wPtr] = value;
103  wPtr = (wPtr +1 ) % buffSize;
104 
105  __sync_synchronize();
106  pthread_cond_signal(&cond_empty);
107  pthread_mutex_unlock(&mtx);
108  }
109 
110  /**
111  * Blocking Read a value from the circular buffer
112  * @return The next item in the queue to be read
113  */
114  T read(void)
115  {
116  if ( pthread_mutex_lock(&mtx) ) {
117  fprintf(stderr, "LOCKING ERROR: %s\n", strerror(errno));
118  }
119  while ( rPtr == wPtr ) {
120  pthread_cond_wait(&cond_empty, &mtx);
121  }
122 
123  T ans = buffer[rPtr];
124  rPtr = (rPtr +1 ) % buffSize;
125 
126  __sync_synchronize();
127  pthread_cond_signal(&cond_full);
128  pthread_mutex_unlock(&mtx);
129  return ans;
130  }
131 
132  /**
133  * Non-Blocking Read a value from the circular buffer
134  * @param result Pointer to an item which will be filled in if possible
135  * @return True if an item was available, False otherwisw
136  */
137  bool readNB(T *result)
138  {
139  if ( pthread_mutex_trylock(&mtx) != 0 ) return false;
140  if ( rPtr == wPtr ) {
141  pthread_mutex_unlock(&mtx);
142  return false;
143  }
144 
145  *result = buffer[rPtr];
146  rPtr = (rPtr +1 ) % buffSize;
147 
148  __sync_synchronize();
149  pthread_cond_signal(&cond_full);
150  pthread_mutex_unlock(&mtx);
151  return true;
152  }
153 
154 };
155 
156 }
157 }
158 }
159 
160 
161 
162 #endif
CircularBuffer(size_t bufferSize=0)
Construct a new circular buffer.
Definition: circularBuffer.h:49
Definition: action.cc:17
Multi-process safe, Circular Buffer class.
Definition: circularBuffer.h:31
T read(void)
Blocking Read a value from the circular buffer.
Definition: circularBuffer.h:114
bool readNB(T *result)
Non-Blocking Read a value from the circular buffer.
Definition: circularBuffer.h:137
void write(const T &value)
Write a value to the circular buffer.
Definition: circularBuffer.h:93