mqttclient.h 12 KB
/* ****************************************************************************
 * Copyright 2019 Open Systems Development BV                                 *
 *                                                                            *
 * Permission is hereby granted, free of charge, to any person obtaining a    *
 * copy of this software and associated documentation files (the "Software"), *
 * to deal in the Software without restriction, including without limitation  *
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
 * and/or sell copies of the Software, and to permit persons to whom the      *
 * Software is furnished to do so, subject to the following conditions:       *
 *                                                                            *
 * The above copyright notice and this permission notice shall be included in *
 * all copies or substantial portions of the Software.                        *
 *                                                                            *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
 * DEALINGS IN THE SOFTWARE.                                                  *
 * ***************************************************************************/
#ifndef OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H
#define OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H

// std
#include <condition_variable>
#include <memory>
#include <mutex>
#include <set>
#include <thread>
#include <vector>

// osdev::components::mqtt

#include "istatecallback.h"
#include "imqttclient.h"
#include "mqtt_lwt.h"
#include "serverstate.h"
#include "synchronizedqueue.h"

// osdev::components::logger
#include "log.h"

namespace osdev {
namespace components {
namespace mqtt {

// Internal structure for storing subscriptions until a valid connection becomes available.
class Subscription
{
public:
    Subscription( const std::string &topic, int qos, const std::function<void(MqttMessage)> call_back )
        : m_topic( topic )
        , m_qos( qos )
        , m_call_back(call_back )
    {}

    std::string     getTopic() const { return m_topic; }
    int             getQoS() const { return m_qos; }
    std::function<void(MqttMessage)>  getCallBack() { return m_call_back; }

private:
    std::string m_topic;
    int         m_qos;
    std::function<void(MqttMessage)> m_call_back;
};

// Forward definition
class IMqttClientImpl;

class MqttClient : public virtual IMqttClient
{
public:
    /*!
     *  \brief  Construct an instance of the MqttClient.
     *  \param  clientId The client identification used in the connection to the mqtt broker.
     *  \param  deliveryCompleteCallback Optional callback used to signal completion of a publication.
     */
    MqttClient( const std::string& clientId, const std::function<void(const Token& token)>& deliveryCompleteCallback = std::function<void(const Token& token)>{});
    virtual ~MqttClient() override;

    // Non copyable, non movable
    MqttClient(const MqttClient&) = delete;
    MqttClient& operator=(const MqttClient&) = delete;
    MqttClient(MqttClient&&) = delete;
    MqttClient& operator=(MqttClient&&) = delete;

    /**
     * @see IStateCallback
     */
    virtual std::string clientId() const override;

    /**
     * @see IStateCallback
     */
    virtual StateChangeCallbackHandle registerStateChangeCallback(const SlotStateChange& cb) override;

    /**
     * @see IStateCallback
     */
    virtual void unregisterStateChangeCallback(StateChangeCallbackHandle handle) override;

    /**
     * @see IStateCallback
     */
    virtual void clearAllStateChangeCallbacks() override;

    /**
     * @see IStateCallback
     */
    virtual StateEnum state() const override;

    // MqttClient interface

    /**
     * @see IMqttClient
     */
    virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;

    /**
     * @see IMqttClient
     */
    virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;

    /**
     * @see IMqttClient
     */
    virtual void disconnect() override;

    /**
     * @see IMqttClient
     */
    virtual Token publish(const MqttMessage& message, int qos) override;

    /**
     * @see IMqttClient
     * When an overlapping subscription is detected a new connection has to be created. This is a synchronous action
     * and this method will wait for the connection to be up.
     */
    virtual Token subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage)>& cb) override;

    /**
     * @see IMqttClient
     */
    virtual std::set<Token> unsubscribe(const std::string& topic, int qos) override;

    /**
     * @see IMqttClient
     */
    virtual bool waitForCompletion(std::chrono::milliseconds waitFor) const override;

    /**
     * @see IMqttClient
     */
    virtual bool waitForCompletion(std::chrono::milliseconds waitFor, const Token& token) const override;

    /**
     * @see IMqttClient
     */
    virtual bool waitForCompletion(std::chrono::milliseconds waitFor, const std::set<Token>& tokens) const override;

    /**
     * @see IMqttClient
     */
    virtual boost::optional<bool> commandResult(const Token& token) const override;

    /**
     * @see IMqttClient
     */
    virtual std::string endpoint() const override;

    /*!
     * \brief setMask update the current logMask
     * \param logMask   - Enum defining the logmask used.
     */
    void setMask    ( osdev::components::log::LogMask logMask );
    /*!
     * \brief setLogLevel update the current logLevel
     * \param logLevel  - Enum defining the logLevel used, in combination with Mask.
     */
    void setLogLevel( osdev::components::log::LogLevel logLevel );
    /*!
     * \brief setContext update the current context
     * \param context   - String containing the new context name.
     */
    void setContext ( std::string context );

private:
    /*!
     *  \brief  Callback used to pick up the connection status of the wrappers.
     *  \param  id The client id.
     *  \param  cs The connection status.
     */
    void connectionStatusChanged( const std::string& id, ConnectionStatus cs );

    /*!
      * \brief  Callback for handling delivery complete.
      * \param  clientId The identifier of the client on which the publish command is executed.
      * \param  token The token identifies the publish command.
      */
    void deliveryComplete( const std::string& clientId, std::int32_t token );

    /**
     *  \brief  Wait for commands to complete including active tokens in this client.
     *          The interface mutex is not locked by this method.
     *          First wait for client commands to complete (use method waitForCompletionInternalClients)
     *          and then wait for publish delivery callbacks to complete.
     *  \param          clients     - Vector with client wrapper pointers that need to be waited on.
     *  \param[in,out]  waitFor     - The number of milliseconds to wait for completetion of all commands and delivery callbacks.
     *  \param          tokens      - The tokens to wait for. An empty set means to wait for all commands on all clients to complete
     *                                including all publish delivery callbacks.
     *  \return True when commands have completed in time including delivery callbacks or false if not.
     */
    bool waitForCompletionInternal(const std::vector<IMqttClientImpl*>& clients, std::chrono::milliseconds waitFor, const std::set<Token>& tokens) const;

    /**
     *  \brief  Wait for commands on the wrapper clients to complete.
     *          The interface mutex is not locked by this method.
     *  \param          clients     - Vector with client wrapper pointers that need to be waited on.
     *  \param[in,out]  waitFor     - The number of milliseconds to wait for completetion of all commands.
     *                                On return waitFor contains the time left.
     *  \param          tokens      - The tokens to wait for. An empty set means to wait for all commands
     *                                on all clients to complete.
     *  \return True when all commands have completed in time or false if not.
     */
    bool waitForCompletionInternalClients(const std::vector<IMqttClientImpl*>& clients, std::chrono::milliseconds& waitFor, const std::set<Token>& tokens) const;

    /**
     * @brief Determine the state of this client based on the connection statusses of its client wrappers.
     * The states this client can communicate are:
     * Unknown           : When at least one wrapper is in a different state then Connected or ReconnectInProgress or when no wrappers are available.
     * Good              : When all wrappers are connected.
     * ConnectionFailure : When at least one wrapper attempts reconnection.
     * Unregister        : When the serverstate instance is destroyed.
     *
     * The other states are about the information providers to the mqtt broker (the publishers) and we cannot say anything about them here.
     * The state "Good" is the exception. This state means in this case that this clients connection is ok and not that the underlying data
     * source (publisher) is ok.
     *
     * @param connectionStates A vector with the connection statusses of all client wrappers.
     */
    StateEnum determineState(const std::vector<ConnectionStatus>& connectionStates);

    /**
     * @brief Add an event to the synchronized queue.
     * @param ev A function object that performs work in the context of this class.
     */
    void pushEvent(std::function<void()> ev);

    /**
     * @brief Worker method that executes the events.
     */
    void eventHandler();

    mutable std::mutex m_interfaceMutex;                                    ///< Makes the interface mutual exclusive
    mutable std::mutex m_internalMutex;                                     ///< Protect the internal state.
    mutable std::mutex m_subscriptionMutex;                                 ///< Protect the deferred Subscription Buffer
    std::string m_endpoint;                                                 ///< The endpoint uri.
    std::string m_clientId;                                                 ///< The main client identification.
    std::set<Token> m_activeTokens;                                         ///< Set with active command tokens. Callbacks still need to be made for these tokens.
    mutable std::condition_variable m_activeTokensCV;                       ///< Wait on a condition to become true w.r.t. the active token set.
    std::function<void(const Token&)> m_deliveryCompleteCallback;           ///< Optional callback for publish completion.
    ServerState m_serverState;                                              ///< Records the state of the connection to the broker that this client is connected to.
    std::unique_ptr<IMqttClientImpl> m_principalClient;                     ///< The main wrapper client.
    std::vector<std::unique_ptr<IMqttClientImpl>> m_additionalClients;      ///< A vector of additional wrapper clients.
    SynchronizedQueue<std::function<void()>> m_eventQueue;                  ///< Synchronized queue for scheduling additional work.
    std::thread m_workerThread;                                             ///< A worker thread that is used to perform actions that cannot be done on the callback threads.
    std::vector<Subscription> m_deferredSubscriptions;                       ///< A buffer to store subscription requests until the principal client comes online
};

}       // End namespace mqtt
}       // End namespace components
}       // End namespace osdev

#endif  // OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H