synchronizedqueue.h 11.6 KB
/* ****************************************************************************
 * Copyright 2019 Open Systems Development BV                                 *
 *                                                                            *
 * Permission is hereby granted, free of charge, to any person obtaining a    *
 * copy of this software and associated documentation files (the "Software"), *
 * to deal in the Software without restriction, including without limitation  *
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
 * and/or sell copies of the Software, and to permit persons to whom the      *
 * Software is furnished to do so, subject to the following conditions:       *
 *                                                                            *
 * The above copyright notice and this permission notice shall be included in *
 * all copies or substantial portions of the Software.                        *
 *                                                                            *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
 * DEALINGS IN THE SOFTWARE.                                                  *
 * ***************************************************************************/
#ifndef OSDEV_COMPONENTS_MQTT_SYNCHRONIZEDQUEUE_H
#define OSDEV_COMPONENTS_MQTT_SYNCHRONIZEDQUEUE_H

// std
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <type_traits>

// osdev::components::mqtt
#include "lockguard.h"
#include "metaprogrammingdefs.h"

namespace osdev {
namespace components {
namespace mqtt {

OSDEV_COMPONENTS_HASMETHOD_TRAIT(capacity, )

/*!
 *  \brief  Generic Queue template for defining a thin
 *          wrapper around std::queue to make overflow detection possible.
 *          This template has no definition and will lead to a compile
 *          time error when it is chosen.
 */
template <typename T, typename C, typename enable = void>
class Queue;

/*!
 *  \brief  A specialization when the underlying container has a capacity method.
 *          To detect overflow the capacity of the underlying container is needed.
 *          Not all containers have a capacity.
 *          When the capacity method is not available SFINAE will discard this template.
 */
template <typename T, typename C>
class Queue<T, C, typename std::enable_if<has_capacity<C>::value>::type> : public std::queue<T, C>
{
public:
    using size_type = typename std::queue<T, C>::size_type;

    typename C::size_type capacity() const
    {
        return this->c.capacity();
    }
};

/*!
 *  \brief  A specialization for when the underlying container does not support a capacity
 *          In this case max_size is returned which results in overflow not being detected.
 */
template <typename T, typename C>
class Queue<T, C, typename std::enable_if<!has_capacity<C>::value>::type> : public std::queue<T, C>
{
public:
    using size_type = typename std::queue<T, C>::size_type;
    typename C::size_type capacity() const
    {
        return this->c.max_size();
    }
};

/*!
 *  \brief  Represents a synchronized queue
 *  @tparam T The type of the items in the queue
 *  @tparam C The underlying character. The container must satisfy the
 *            requirements of a SequenceContainer.
 *            Addittionally container must supply a pop_front and a max_size method.
 *
 *            The underlying container determines the overflow behaviour.
 *            A circular buffer leads to a lossy queue that drops items when the
 *            queue is full while a std::deque will never overflow.
 *
 *            The queue has the following states: started, paused, stopped.
 *            In the started state the queue acceptsincoming items and it allows items
 *            to be popped when data is available.
 *            In the paused state incoming items are allowed. The pop method will
 *            block until the queue is unpaused or stopped.
 *            In the stopped state incoming items are not allowed and dropped.
 *            The pop method will return false.
 */
template <typename T, typename C = std::deque<T>>
class SynchronizedQueue
{
public:
    using QueueType = Queue<T, C>;

    /*!
     * \brief Constructs an empty queue
     * \param id     - Identification string for this queue ( used in logging ).
     * \param paused - The state in which to setup the queue ( pause or active),
     *                 ( Default is active )
     */
    explicit SynchronizedQueue( const std::string &id, bool paused = false )
        : m_id( id )
        , m_queueMutex()
        , m_dataAvailableOrStopCV()
        , m_queue()
        , m_stop( false )
        , m_pause( paused )
        , m_numOverflows( 0 )
    {}

    /*!
     *  \brief  Stops the queue on destruction
     */
    ~SynchronizedQueue()
    {
        this->stop();
    }

    /*!
     *  @brief  Pushes the item in the queue
     *  @tparam TItem   - The cv qualified type of the value to push.
     *                    In a stopped state the queue drops incoming data.
     *                    In a paused / active state the queue accepts incoming data.
     *  @tparam item    - The item to push to the queue.
     */
    template <typename TItem>
    void push(TItem &&item)
    {
        OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC( m_queueMutex, m_id );
        if( m_stop )
        {
            return;
        }

        if( m_queue.capacity() == m_queue.size() )
        {
            if( m_numOverflows++ % 100 == 0 )
            {
                // Log a warning that there is a number of overflows.
            }
        }
        m_queue.push( std::forward<TItem>(item) );
        OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id);
        m_dataAvailableOrStopCV.notify_one();
    }

    /*!
     * @brief   pop  - Pops an item from the queue. This method blocks when te state is paused or when there is no data available.
     * @param   item - The item to which to copy the popped item.
     * @return  True if an item was popped: otherwise false
     */
    bool pop(T& item)
    {
        OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC( m_queueMutex, m_id );
        m_dataAvailableOrStopCV.wait(OSDEV_COMPONENTS_UNIQUELOCK(m_queueMutex), [this]()
        { return ((this->m_queue.size() > 0 && !m_pause) || this->m_stop); });
        if( m_stop )
        {
            return false;
        }
        item = std::move(m_queue.front());
        m_queue.pop();
        return true;
    }

    bool pop(std::vector<T> &items)
    {
        OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC( m_queueMutex, m_id );
        m_dataAvailableOrStopCV.wait(OSDEV_COMPONENTS_UNIQUELOCK(m_queueMutex), [this]()
        { return ((this->m_queue.size() > 0 && !m_pause) || this->m_stop); });
        if( m_stop )
        {
            return false;
        }
        items.clear();
        items.reserve(m_queue.size());
        while( m_queue.size() > 0 )
        {
            items.emplace_back(std::move(m_queue.front() ) );
            m_queue.pop();
        }
        return true;
    }

    /*!
     *  \return The current size of the queue
     */
    typename QueueType::size_type size() const
    {
        OSDEV_COMPONENTS_LOCKGUARD_SPECIFIC(m_queueMutex, m_id);
        return m_queue.size();
    }


    /*!
     *  \brief  Start the Queue
     *  The queue is only started when it is in a stopped state.
     *  \param  paused  - If true, the queue will be started in a paused
     *                    state which means that no items will be popped.
     */
    void start(bool paused)
    {
        // Reason that a lock is used: See documentation of std::condition_variable
        //
        // Even is the shared variable is atomic (m_stop in this case), it must be modified under the mutex
        // in order to correctly publish the modification to the waiting thread.
        //
        OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC(m_queueMutex, m_id);
        if( !m_stop )
        {
            // already started
            return;
        }
        m_stop = false;
        m_pause = paused;
        OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id);
        if( !paused )
        {
            m_dataAvailableOrStopCV.notify_all();
        }
    }

    /*!
     *  \brief  Pause or unpause the queue.
     *  When the queue is paused no items will be popped.
     *  The state is not altered when the queue is stopped.
     *  \param  value   - Flag that indicates whether the queue is paused or unpaused.
     */
    void pause(bool value)
    {
        // Reason that a lock is used: see documentation of std::condition_variable
        //
        // Even if the shared variable is atomic (m_stop in this case), it must be modified under the mutex
        // in order to correctly publish the modification to the waiting thread.
        //
        OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC(m_queueMutex, m_id);
        if (m_stop) {
            return;
        }
        m_pause = value;
        OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id);
        if (!value) {
            m_dataAvailableOrStopCV.notify_all();
        }
    }

    /*!
     *  \brief Stop the queue.
     *  The pop method will return a false after calling this method.
     *  The queue can be restarted with the start method.
     */
    void stop()
    {
        // Reason that a lock is used: see documentation of std::condition_variable
        //
        // Even if the shared variable is atomic (m_stop in this case), it must be modified under the mutex
        // in order to correctly publish the modification to the waiting thread.
        //
        OSDEV_COMPONENTS_UNIQUELOCK_CREATE_SPECIFIC(m_queueMutex, m_id);
        m_stop = true;
        m_pause = false;
        OSDEV_COMPONENTS_UNIQUELOCK_UNLOCK_SPECIFIC(m_queueMutex, m_id);
        m_dataAvailableOrStopCV.notify_all();
    }

    /*!
     *  \brief  Clears the queue.
     *  This method also resets the overflow counter.
     */
    void clear()
    {
        OSDEV_COMPONENTS_LOCKGUARD_SPECIFIC(m_queueMutex, m_id);
        QueueType emptyQueue;
        std::swap(m_queue, emptyQueue);
        m_numOverflows = 0;
    }

private:
    const std::string               m_id;                       ///< Queue identification string
    mutable std::mutex              m_queueMutex;               ///< Protects access to the queue
    std::condition_variable         m_dataAvailableOrStopCV;    ///< Provides wait functionality for the queue becoming empty
    QueueType                        m_queue;                    ///< Holds the items
    std::atomic_bool                m_stop;                     ///< Flag that indicates whether the queue needs to stop.
    std::atomic_bool                m_pause;                    ///< Flag that indicates whether the queue is paused.

    /*!
     *  \brief  Counts the number of items that the buffer overflows.
     *          If the underlying buffer is a ring buffer an overflow
     *          means that an item will be overwritten. For a normal
     *          sequence container it means that the it is enlarged.
     */
    std::uint32_t                   m_numOverflows;
};





}       // End namespace mqtt
}       // End namespace components
}       // End namespace osdev

#endif  // OSDEV_COMPONENTS_MQTT_SYNCHRONIZEDQUEUE_H