/* * This template is for implementing a ring buffer. It does not block or use * system locks of any kind. It uses rather rigid memory bariers to ensure data integrity. * * based on code from here: http://www.musicdsp.org/archive.php?classid=5#119 */ #ifndef FIFO_H #define FIFO_H #include #include #include #include #include #include #include #include "Atomic.h" using std::vector; using std::exception; using namespace std; class QueueFullException : exception { } ; /* template class LockingFifo { private: int_condition_t cond;//locks & stores the number of buffers that have data. volatile unsigned readidx, writeidx; vector buffer; volatile bool isResetting; public: LockingFifo (size_t bufsz) : readidx(0), writeidx(0), buffer(bufsz) { int_condition_init( &cond, 0 ); isResetting=false; } //note that put and get update the value of user data as the very last step //so these things pretty much need to be finnished before they register. int numAvailable() { return cond.userData; } //blocks until there are at least num objects in Fifo or until reset. // if reset, function returns 0, otherwise it returns number in queue size_t blockUntilAtLeast( size_t num ) { safe_pthread_mutex_lock( &(cond.mutex) ); if( isResetting ) { int_condition_unlock_and_update( &cond ); return 0; } while( size_t(cond.userData) < num ) { safe_pthread_cond_wait( &(cond.cond), &(cond.mutex) ); if( isResetting ) { int_condition_unlock_and_update( &cond ); return 0; } } if( isResetting ) { int_condition_unlock_and_update( &cond ); return 0; } int d = cond.userData; int_condition_unlock_and_update( &cond ); return d; } size_t capacity() { return buffer.size(); } T get (void) { //--block until we have at least one piece of data: safe_pthread_mutex_lock( &(cond.mutex) ); if( isResetting ) { int_condition_unlock_and_update( &cond ); return NULL; } while( cond.userData <= 0 ) { safe_pthread_cond_wait( &(cond.cond), &(cond.mutex) ); if( isResetting ) { int_condition_unlock_and_update( &cond ); return NULL; } } if( isResetting ) { int_condition_unlock_and_update( &cond ); return NULL; } T result = buffer[readidx]; if ((readidx + 1) >= buffer.size()) readidx = 0; else readidx = readidx + 1; //update the value and release lock cond.userData--; int_condition_unlock_and_update( &cond ); return result; } void put (T datum) { //if( datum==NULL ) // cout << "*********Attempt to enter NULL data.\n"; //--block until there is room for at least one piece of data: safe_pthread_mutex_lock( &(cond.mutex) ); if( isResetting ) { int_condition_unlock_and_update( &cond ); return; } while( cond.userData >= (signed) buffer.size() ) { safe_pthread_cond_wait( &(cond.cond), &(cond.mutex) ); if( isResetting ) { int_condition_unlock_and_update( &cond ); return; } } if( isResetting ) { int_condition_unlock_and_update( &cond ); return; } unsigned newidx; if ((writeidx + 1) >= buffer.size()) newidx = 0; else newidx = writeidx + 1; buffer[writeidx] = datum; writeidx = newidx; //update the value and release lock cond.userData++; int_condition_unlock_and_update( &cond ); } //--any threads waiting on locks in get or put will be forced // to return immediately. get will return NULL. // after calling interrupt and assuing that stray threads are no longer // waiting on put/get, either call reset or continue. void interrupt() { int_condition_lock( &cond ); isResetting=true; int_condition_unlock_and_update( &cond ); //this just helps the other threads run but isn't necessary: //sched_yield(); } //--Continues normal use after interrupt without any change to the data // this function does not wait on any locks so all threads should // be joined when it is called. void resume() { isResetting=false; } //--Resets all values to their initial settings. // this function does not wait on any locks so all threads should // be joined when it is called. void reset() { cond.userData=0; readidx=writeidx=0; isResetting=false; } ~LockingFifo() { int_condition_destroy( &cond ); } }; */ template class NonLockingFifo { private: unsigned readidx, writeidx; vector buffer; T onGetFailure; bool isResetting; public: NonLockingFifo (size_t bufsz, T onGetFailure) : readidx(0), writeidx(0), buffer(bufsz+1), onGetFailure( onGetFailure ), isResetting( false ) { } ~NonLockingFifo() { isResetting=false; readidx = writeidx = 0; buffer.resize( 0 ); } //Obviously, this should be called when the FIFO is empty and inactive void setCapacity( size_t s ) { readidx = writeidx = 0; buffer.resize( s+1 ); isResetting = false; } inline size_t getCapacity() { return buffer.size(); } inline const size_t getCapacity() const { return buffer.size(); } size_t blockUntilCanGetAtLeast( size_t num ) { while( !isResetting ) { size_t c = canGet(); if( c >= num ) { return c; } struct timeval tv; /* Wait up to .01 seconds. */ //cout << "waiting. " << c << " : " << num << endl; tv.tv_sec = 0; tv.tv_usec = 10000; select(0, NULL, NULL, NULL, &tv); } return canGet(); } size_t canGet() { readMemoryBarrier(); size_t w = writeidx; size_t r = readidx; if( w < r ) return buffer.size() + (w-r); else return w-r; } T get (void) { if( writeidx == readidx ) return onGetFailure; T result = buffer[readidx]; writeMemoryBarrier(); if ((readidx + 1) >= buffer.size()) readidx = 0; else readidx = readidx + 1; return result; } size_t canPut() { //cout << "00000" << endl; size_t a = buffer.size() - canGet() - 1; //cout << "00001" << endl; return a; /* size_t newidx; readMemoryBarrier(); if ((writeidx + 1) >= buffer.size()) newidx = 0; else newidx = writeidx + 1; if( newidx == readidx ) return 0; return true; */ } void put (T datum) { size_t newidx; if ((writeidx + 1) >= buffer.size()) newidx = 0; else newidx = writeidx + 1; if( newidx == readidx ) { throw QueueFullException(); } writeMemoryBarrier(); buffer[writeidx] = datum; writeidx = newidx; } // -- interrupts blockUntilAtLeast and foreces it to return. void interrupt() { isResetting=true; } //--Continues normal use after interrupt without any change to the data // this function does not wait on any locks so all threads should // be joined when it is called. void resume() { isResetting=false; } //--Resets all values to their initial settings. // this function does not wait on any locks so all threads should // be joined when it is called. void reset() { readidx=writeidx=0; isResetting=false; } }; #endif /*ifndef FIFO_H*/