#ifndef HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H #define HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H #include #include #include #include #include #include namespace osgFFmpeg { template class BoundedMessageQueue { public: typedef T value_type; typedef size_t size_type; explicit BoundedMessageQueue(size_type capacity); ~BoundedMessageQueue(); void clear(); template void flush(const Destructor destructor); void push(const value_type & value); bool tryPush(const value_type & value); bool timedPush(const value_type & value, unsigned long ms); value_type pop(); value_type tryPop(bool & is_empty); value_type timedPop(bool & is_empty, unsigned long ms); private: BoundedMessageQueue(const BoundedMessageQueue &); BoundedMessageQueue & operator = (const BoundedMessageQueue &); typedef std::vector Buffer; typedef OpenThreads::Condition Condition; typedef OpenThreads::Mutex Mutex; typedef OpenThreads::ScopedLock ScopedLock; bool isFull() const; bool isEmpty() const; void unsafePush(const value_type & value); value_type unsafePop(); Buffer m_buffer; size_type m_begin; size_type m_end; size_type m_size; Mutex m_mutex; Condition m_not_empty; Condition m_not_full; }; template BoundedMessageQueue::BoundedMessageQueue(const size_type capacity) : m_buffer(capacity), m_begin(0), m_end(0), m_size(0) { } template BoundedMessageQueue::~BoundedMessageQueue() { } template void BoundedMessageQueue::clear() { { ScopedLock lock(m_mutex); m_buffer.clear(); m_begin = 0; m_end = 0; m_size = 0; } m_not_full.broadcast(); } template template void BoundedMessageQueue::flush(const Destructor destructor) { { ScopedLock lock(m_mutex); while (! isEmpty()) { value_type value = unsafePop(); destructor(value); } m_begin = 0; m_end = 0; m_size = 0; } m_not_full.broadcast(); } template void BoundedMessageQueue::push(const value_type & value) { { ScopedLock lock(m_mutex); while (isFull()) m_not_full.wait(&m_mutex); unsafePush(value); } m_not_empty.signal(); } template bool BoundedMessageQueue::tryPush(const value_type & value) { { ScopedLock lock(m_mutex); if (isFull()) return false; unsafePush(value); } m_not_empty.signal(); return true; } template bool BoundedMessageQueue::timedPush(const value_type & value, const unsigned long ms) { // We don't wait in a loop to avoid an infinite loop (as the ms timeout would not be decremented). // This means that timedPush() could return false before the timeout has been hit. { ScopedLock lock(m_mutex); if (isFull()) m_not_full.wait(&m_mutex, ms); if (isFull()) return false; unsafePush(value); } m_not_empty.signal(); return true; } template typename BoundedMessageQueue::value_type BoundedMessageQueue::pop() { value_type value; { ScopedLock lock(m_mutex); while (isEmpty()) m_not_empty.wait(&m_mutex); value = unsafePop(); } m_not_full.signal(); return value; } template typename BoundedMessageQueue::value_type BoundedMessageQueue::tryPop(bool & is_empty) { value_type value; { ScopedLock lock(m_mutex); is_empty = isEmpty(); if (is_empty) return value_type(); value = unsafePop(); } m_not_full.signal(); return value; } template typename BoundedMessageQueue::value_type BoundedMessageQueue::timedPop(bool & is_empty, const unsigned long ms) { value_type value; { ScopedLock lock(m_mutex); // We don't wait in a loop to avoid an infinite loop (as the ms timeout would not be decremented). // This means that timedPop() could return with (is_empty = true) before the timeout has been hit. if (isEmpty()) m_not_empty.wait(&m_mutex, ms); is_empty = isEmpty(); if (is_empty) return value_type(); value = unsafePop(); } m_not_full.signal(); return value; } template inline bool BoundedMessageQueue::isFull() const { return m_size == m_buffer.size(); } template inline bool BoundedMessageQueue::isEmpty() const { return m_size == 0; } template inline void BoundedMessageQueue::unsafePush(const value_type & value) { // Note: this shall never be called if the queue is full. assert(! isFull()); m_buffer[m_end++] = value; if (m_end == m_buffer.size()) m_end = 0; ++m_size; } template inline typename BoundedMessageQueue::value_type BoundedMessageQueue::unsafePop() { // Note: this shall never be called if the queue is empty. assert(! isEmpty()); const size_t pos = m_begin; ++m_begin; --m_size; if (m_begin == m_buffer.size()) m_begin = 0; return m_buffer[pos]; } } // namespace osgFFmpeg #endif // HEADER_GUARD_OSGFFMPEG_BOUNDED_MESSAGE_QUEUE_H