/* **************************************************************************** * Copyright 2019 Open Systems Development BV * * * * Permission is hereby granted, free of charge, to any person obtaining a * * copy of this software and associated documentation files (the "Software"), * * to deal in the Software without restriction, including without limitation * * the rights to use, copy, modify, merge, publish, distribute, sublicense, * * and/or sell copies of the Software, and to permit persons to whom the * * Software is furnished to do so, subject to the following conditions: * * * * The above copyright notice and this permission notice shall be included in * * all copies or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * * DEALINGS IN THE SOFTWARE. * * ***************************************************************************/ #ifndef OSDEV_COMPONENTS_MQTT_SYNCHRONIZEDQUEUE_H #define OSDEV_COMPONENTS_MQTT_SYNCHRONIZEDQUEUE_H // std #include #include #include #include #include #include #include // osdev::components::mqtt #include "lockguard.h" #include "metaprogrammingdefs.h" namespace osdev { namespace components { namespace mqtt { OSDEV_COMPONENTS_HASMETHOD_TRAIT(capacity, ) /*! * \brief Generic Queue template for defining a thin * wrapper around std::queue to make overflow detection possible. * This template has no definition and will lead to a compile * time error when it is chosen. */ template class Queue; /*! * \brief A specialization when the underlying container has a capacity method. * To detect overflow the capacity of the underlying container is needed. * Not all containers have a capacity. * When the capacity method is not available SFINAE will discard this template. */ template class Queue::value>::type> : public std::queue { public: using size_type = typename std::queue::size_type; typename C::size_type capacity() const { return this->c.capacity(); } }; /*! * \brief A specialization for when the underlying container does not support a capacity * In this case max_size is returned which results in overflow not being detected. */ template class Queue::value>::type> : public std::queue { public: using size_type = typename std::queue::size_type; typename C::size_type capacity() const { return this->c.max_size(); } }; /*! * \brief Represents a synchronized queue * @tparam T The type of the items in the queue * @tparam C The underlying character. The container must satisfy the * requirements of a SequenceContainer. * Addittionally container must supply a pop_front and a max_size method. * * The underlying container determines the overflow behaviour. * A circular buffer leads to a lossy queue that drops items when the * queue is full while a std::deque will never overflow. * * The queue has the following states: started, paused, stopped. * In the started state the queue acceptsincoming items and it allows items * to be popped when data is available. * In the paused state incoming items are allowed. The pop method will * block until the queue is unpaused or stopped. * In the stopped state incoming items are not allowed and dropped. * The pop method will return false. */ template > class SynchronizedQueue { public: using QueueType = Queue; /*! * \brief Constructs an empty queue * \param id - Identification string for this queue ( used in logging ). * \param paused - The state in which to setup the queue ( pause or active), * ( Default is active ) */ explicit SynchronizedQueue( const std::string &id, bool paused = false ) : m_id( id ) , m_queueMutex() , m_dataAvailableOrStopCV() , m_queue() , m_stop( false ) , m_pause( paused ) , m_numOverflows( 0 ) {} /*! * \brief Stops the queue on destruction */ ~SynchronizedQueue() { this->stop(); } /*! * @brief Pushes the item in the queue * @tparam TItem - The cv qualified type of the value to push. * In a stopped state the queue drops incoming data. * In a paused / active state the queue accepts incoming data. * @tparam item - The item to push to the queue. */ template void push(TItem &&item) { OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC( m_queueMutex, m_id ); if( m_stop ) { return; } if( m_queue.capacity() == m_queue.size() ) { if( m_numOverflows++ % 100 == 0 ) { // Log a warning that there is a number of overflows. } } m_queue.push( std::forward(item) ); OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id); m_dataAvailableOrStopCV.notify_one(); } /*! * @brief pop - Pops an item from the queue. This method blocks when te state is paused or when there is no data available. * @param item - The item to which to copy the popped item. * @return True if an item was popped: otherwise false */ bool pop(T& item) { OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC( m_queueMutex, m_id ); m_dataAvailableOrStopCV.wait(OSDEV_COMPONENTS_UNIQUELOCK(m_queueMutex), [this]() { return ((this->m_queue.size() > 0 && !m_pause) || this->m_stop); }); if( m_stop ) { return false; } item = std::move(m_queue.front()); m_queue.pop(); return true; } bool pop(std::vector &items) { OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC( m_queueMutex, m_id ); m_dataAvailableOrStopCV.wait(OSDEV_COMPONENTS_UNIQUELOCK(m_queueMutex), [this]() { return ((this->m_queue.size() > 0 && !m_pause) || this->m_stop); }); if( m_stop ) { return false; } items.clear(); items.reserve(m_queue.size()); while( m_queue.size() > 0 ) { items.emplace_back(std::move(m_queue.front() ) ); m_queue.pop(); } return true; } /*! * \return The current size of the queue */ typename QueueType::size_type size() const { OSDEV_COMPONENTS_LOCKGUARD_SPECIFIC(m_queueMutex, m_id); return m_queue.size(); } /*! * \brief Start the Queue * The queue is only started when it is in a stopped state. * \param paused - If true, the queue will be started in a paused * state which means that no items will be popped. */ void start(bool paused) { // Reason that a lock is used: See documentation of std::condition_variable // // Even is the shared variable is atomic (m_stop in this case), it must be modified under the mutex // in order to correctly publish the modification to the waiting thread. // OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC(m_queueMutex, m_id); if( !m_stop ) { // already started return; } m_stop = false; m_pause = paused; OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id); if( !paused ) { m_dataAvailableOrStopCV.notify_all(); } } /*! * \brief Pause or unpause the queue. * When the queue is paused no items will be popped. * The state is not altered when the queue is stopped. * \param value - Flag that indicates whether the queue is paused or unpaused. */ void pause(bool value) { // Reason that a lock is used: see documentation of std::condition_variable // // Even if the shared variable is atomic (m_stop in this case), it must be modified under the mutex // in order to correctly publish the modification to the waiting thread. // OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC(m_queueMutex, m_id); if (m_stop) { return; } m_pause = value; OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id); if (!value) { m_dataAvailableOrStopCV.notify_all(); } } /*! * \brief Stop the queue. * The pop method will return a false after calling this method. * The queue can be restarted with the start method. */ void stop() { // Reason that a lock is used: see documentation of std::condition_variable // // Even if the shared variable is atomic (m_stop in this case), it must be modified under the mutex // in order to correctly publish the modification to the waiting thread. // OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC(m_queueMutex, m_id); m_stop = true; m_pause = false; OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id); m_dataAvailableOrStopCV.notify_all(); } /*! * \brief Clears the queue. * This method also resets the overflow counter. */ void clear() { OSDEV_COMPONENTS_LOCKGUARD_SPECIFIC(m_queueMutex, m_id); QueueType emptyQueue; std::swap(m_queue, emptyQueue); m_numOverflows = 0; } private: const std::string m_id; ///< Queue identification string mutable std::mutex m_queueMutex; ///< Protects access to the queue std::condition_variable m_dataAvailableOrStopCV; ///< Provides wait functionality for the queue becoming empty QueueType m_queue; ///< Holds the items std::atomic_bool m_stop; ///< Flag that indicates whether the queue needs to stop. std::atomic_bool m_pause; ///< Flag that indicates whether the queue is paused. /*! * \brief Counts the number of items that the buffer overflows. * If the underlying buffer is a ring buffer an overflow * means that an item will be overwritten. For a normal * sequence container it means that the it is enlarged. */ std::uint32_t m_numOverflows; }; } // End namespace mqtt } // End namespace components } // End namespace osdev #endif // OSDEV_COMPONENTS_MQTT_SYNCHRONIZEDQUEUE_H