#include #include #include "Async.h" #include "sleep.h" #include using namespace std; #define ASYNC_DEBUG(a) // ( cout << "ASYNC QUEUE: " << a << endl ) class AsyncQueue; static void * queueFunc( void * ops ) { ASYNC_DEBUG( "running queue function" ); NonLockingFifo< AsyncOperation * > * operations; operations = ( NonLockingFifo< AsyncOperation * > * ) ops; while( true ) { ASYNC_DEBUG( "blocking until op is avail" ); operations->blockUntilCanGetAtLeast( 1 ); ASYNC_DEBUG( "Avail" ); ASYNC_DEBUG( "getting block" ); AsyncOperation * op = operations->get(); ASYNC_DEBUG( "got block" ); if( op == NULL ) { ASYNC_DEBUG( "Done with Queue (null)." ); return NULL; } if( op->shouldExit() ) { ASYNC_DEBUG( "Done with Queue (shouldExit)." ); return NULL; } op->performOperation(); } // we should never get here! return NULL; } AsyncQueue::AsyncQueue( size_t numOps ) : operations( numOps, NULL ), isThreadStarted( false ) { ASYNC_DEBUG( "created async Queue with " << numOps << " ops." ); } void AsyncQueue::enqueue( AsyncOperation *ao ) throw( CannotEnqueue ) { ASYNC_DEBUG( "Enqueuing operation (checking if we can)." ); if( !operations.canPut() ) { throw CannotEnqueue( CannotEnqueue::BUFFER_FULL ); } ASYNC_DEBUG( "Enqueuing operation (we can)." ); operations.put( ao ); ASYNC_DEBUG( "Enqued operation." ); } //if you want to change priority, do it in the first op. //returns 0 on sucess, otherwise returns error from pthread_create() int AsyncQueue::start() { ASYNC_DEBUG( "starting Queue." ); if( isThreadStarted ) { ASYNC_DEBUG( "Warning: trying to restart a thread. ignoring." ) ; return 0; } ASYNC_DEBUG( "starting Queue 2." ); int ret = pthread_create( &thread, NULL, (pthread_function_t) &queueFunc, (void *) &(operations) ); if( ret ) return ret; isThreadStarted = true; ASYNC_DEBUG( "started Queue." ); return ret; } bool AsyncQueue::isRunning() { return isThreadStarted; } int AsyncQueue::stop() { ASYNC_DEBUG( "calling async stop." << endl ); if( !isThreadStarted ) return 0; isThreadStarted = false; ASYNC_DEBUG( "enqueuing ExitThreadOperation." << endl ); ExitThreadOperation eto; while( true ) { long sleepTime = 10; ASYNC_DEBUG( "trying to enqueue..." << endl ); try { enqueue( &eto ); ASYNC_DEBUG( "enqueued..." << endl ); break; } catch (CannotEnqueue ce) { ASYNC_DEBUG("Sleeping breifly while queue is full (on wait)"<