00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 #ifndef SST_CORE_INTERPROCESS_CIRCULARBUFFER_H
00013 #define SST_CORE_INTERPROCESS_CIRCULARBUFFER_H 1
00014 
00015 #include <cstddef>
00016 
00017 #include <boost/interprocess/sync/interprocess_mutex.hpp>
00018 #include <boost/interprocess/sync/scoped_lock.hpp>
00019 #include <boost/interprocess/sync/interprocess_condition.hpp>
00020 #include <boost/interprocess/containers/vector.hpp>
00021 
00022 
00023 namespace SST {
00024 namespace Core {
00025 namespace Interprocess {
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 template <typename T, typename A>
00034 class CircularBuffer {
00035     typedef boost::interprocess::vector<T, A> RawBuffer_t;
00036 
00037     boost::interprocess::interprocess_mutex mutex;
00038     boost::interprocess::interprocess_condition cond_full, cond_empty;
00039 
00040     size_t rPtr, wPtr;
00041     RawBuffer_t buffer;
00042     size_t buffSize;
00043 
00044 
00045 public:
00046 
00047 
00048 
00049 
00050 
00051     CircularBuffer(size_t bufferSize, const A & allocator) :
00052         rPtr(0), wPtr(0), buffer(allocator), buffSize(bufferSize)
00053     {
00054         buffer.resize(buffSize);
00055     }
00056 
00057 
00058 
00059 
00060 
00061     void write(const T &value)
00062     {
00063                 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00064                 while ( (wPtr+1) % buffSize == rPtr ) cond_full.wait(lock);
00065 
00066         buffer[wPtr] = value;
00067         wPtr = (wPtr +1 ) % buffSize;
00068 
00069                 cond_empty.notify_one();
00070     }
00071 
00072 
00073 
00074 
00075 
00076     T read(void)
00077     {
00078                 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex);
00079                 while ( rPtr == wPtr ) cond_empty.wait(lock);
00080 
00081         T ans = buffer[rPtr];
00082         rPtr = (rPtr +1 ) % buffSize;
00083 
00084                 cond_full.notify_one();
00085 
00086         return ans;
00087     }
00088 
00089 
00090 
00091 
00092 
00093 
00094     bool readNB(T *result)
00095     {
00096                 boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(mutex, boost::interprocess::try_to_lock);
00097         if ( !lock ) return false;
00098                 if ( rPtr == wPtr ) return false;
00099 
00100         *result = buffer[rPtr];
00101         rPtr = (rPtr +1 ) % buffSize;
00102 
00103                 cond_full.notify_one();
00104 
00105         return true;
00106     }
00107 
00108 };
00109 
00110 }
00111 }
00112 }
00113 
00114 
00115 
00116 #endif