clientpaho.h 13.9 KB
/* ****************************************************************************
 * Copyright 2019 Open Systems Development BV                                 *
 *                                                                            *
 * Permission is hereby granted, free of charge, to any person obtaining a    *
 * copy of this software and associated documentation files (the "Software"), *
 * to deal in the Software without restriction, including without limitation  *
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
 * and/or sell copies of the Software, and to permit persons to whom the      *
 * Software is furnished to do so, subject to the following conditions:       *
 *                                                                            *
 * The above copyright notice and this permission notice shall be included in *
 * all copies or substantial portions of the Software.                        *
 *                                                                            *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
 * DEALINGS IN THE SOFTWARE.                                                  *
 * ***************************************************************************/
#ifndef OSDEV_COMPONENTS_MQTT_CLIENTPAHO_H
#define OSDEV_COMPONENTS_MQTT_CLIENTPAHO_H

// std
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

// boost
#include <boost/regex.hpp>

// paho
#include <MQTTAsync.h>

// osdev::components::mqtt
#include "synchronizedqueue.h"
#include "imqttclientimpl.h"
#include "mqttfailure.h"
#include "mqttsuccess.h"
#include "mqtt_lwt.h"

namespace osdev {
namespace components {
namespace mqtt {

/**
 * @brief Wrapper class for the paho-c library.
 * This implementation uses the clean session flag and recreates subscriptions on reconnect.
 *
 * The implementation allows multiple subscriptions as long as the subscriptions do not have overlap. For mqtt 3 it is not
 * possible to track down the subscription based on the incoming message meta information. By matching the topic against
 * the various topic filters a subscription can be identified (but only when there is only one subscription that matches).
 */
class ClientPaho : public IMqttClientImpl
{
public:
    /**
     * @brief Construct a ClientPaho instance.
     * @param endpoint The endpoint to connect to
     * @param id The clientId that is used in the connection.
     * @param connectionStatusCallback The callback on which connection status information is communicated.
     * @param deliveryCompleteCallback Callback that is called with the publish message tokens for messages that are delivered.
     *                                 Being delivered has different meanings depending on the quality of service.
     */
    ClientPaho(const std::string& endpoint,
        const std::string& id,
        const std::function<void(const std::string&, ConnectionStatus)>& connectionStatusCallback,
        const std::function<void(const std::string&, std::int32_t)>& deliveryCompleteCallback);
    virtual ~ClientPaho() override;

    // Non copyable, non movable.
    ClientPaho(const ClientPaho&) = delete;
    ClientPaho& operator=(const ClientPaho&) = delete;
    ClientPaho(ClientPaho&&) = delete;
    ClientPaho& operator=(ClientPaho&&) = delete;

    /**
     * @see IMqttClientImpl
     */
    virtual std::string clientId() const override;

    /**
     * @see IMqttClientImpl
     */
    virtual ConnectionStatus connectionStatus() const override;

    /**
     * @see IMqttClientImpl
     */
    virtual std::int32_t connect(bool wait, const mqtt_LWT &lwt = mqtt_LWT() ) override;

    /**
     * @see IMqttClientImpl
     */
    virtual std::int32_t disconnect(bool wait, int timeoutMs) override;

    /**
     * @see IMqttClientImpl
     */
    virtual std::int32_t publish(const MqttMessage& message, int qos) override;

    /**
     * @see IMqttClientImpl
     */
    virtual void publishPending() override;

    /**
     * @see IMqttClientImpl
     */
    virtual std::int32_t subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb) override;

    /**
     * @see IMqttClientImpl
     */
    virtual void resubscribe() override;

    /**
     * @see IMqttClientImpl
     */
    virtual std::int32_t unsubscribe(const std::string& topic, int qos) override;

    /**
     * @see IMqttClientImpl
     */
    virtual void unsubscribeAll() override;

    /**
     * @see IMqttClientImpl
     */
    virtual std::chrono::milliseconds waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const override;

    /**
     * @see IMqttClientImpl
     */
    virtual bool isOverlapping(const std::string& topic) const override;

    /**
     * @see IMqttClientImpl
     */
    virtual bool isOverlapping(const std::string& topic, std::string& existingTopic) const override;

    /**
     * @see IMqttClientImpl
     */
    virtual std::vector<std::int32_t> pendingOperations() const override;

    /**
     * @see IMqttClientImpl
     */
    virtual bool hasPendingSubscriptions() const override;

    /**
     * @see IMqttClientImpl
     */
    virtual boost::optional<bool> operationResult(std::int32_t token) const override;

private:
    void parseEndpoint(const std::string& endpoint);

    std::int32_t publishInternal(const MqttMessage& message, int qos);
    std::int32_t subscribeInternal(const std::string& topic, int qos);

    void setConnectionStatus(ConnectionStatus status);
    bool isOverlappingInternal(const std::string& topic, std::string& existingTopic) const;

    /**
     * @brief Internal struct for subscriber information.
     * Used to store subscriptions.
     */
    struct Subscription
    {
        int qos;
        boost::regex topicRegex;
        std::function<void(MqttMessage)> callback;
    };

    /**
     * @brief Internal struct for publisher information.
     * Used to store pending publishes during reconnection.
     */
    struct Publish
    {
        int qos;
        MqttMessage data;
    };

    /**
     * @brief Add an incoming callback event to the synchronized queue.
     * @param ev A function object that calls one of the event handlers on a ClientPaho instance, but other types of actions are also possible.
     */
    void pushIncomingEvent(std::function<void()> ev);

    /**
     * @brief Worker method that executes the events.
     */
    void callbackEventHandler();

    /**
     * @brief Callback method that is called when a reconnect succeeds.
     * @param cause The cause of the original disconnect.
     */
    void onConnectOnInstance(const std::string& cause);

    /**
     * @brief Callback that is called when a connect call succeeds.
     * This callback is also called when a reconnect succeeds because the paho library reuses the initial connect command!
     * The connection status is set to Connected.
     * @param response A success response with connection data.
     */
    void onConnectSuccessOnInstance(const MqttSuccess& response);

    /**
     * @brief Callback that is called when a connect call fails after being sent to the endpoint.
     * This callback is also called when a reconnect fails because the paho library reuses the initial connect command!
     * The connection status is set to Disconnected when the connection state is ConnectInProgress, othwerwise the connection status is left as is.
     * @param response A failure response.
     */
    void onConnectFailureOnInstance(const MqttFailure& response);

    //void onDisconnectOnInstance(enum MQTTReasonCodes reasonCode); for MQTT V5 which is not supported by centos7 paho-c

    /**
     * @brief Callback that is called when a disconnect call succeeds.
     * The connection status is set to Disconnected.
     * @param response A success response with no specific data.
     */
    void onDisconnectSuccessOnInstance(const MqttSuccess& response);

    /**
     * @brief Callback that is called when a disconnect call fails after being sent to the endpoint.
     * Based on the result returned by the paho library The connection status is set to Disconnected or Connected.
     * @param response A failure response.
     */
    void onDisconnectFailureOnInstance(const MqttFailure& response);

    /**
     * @brief Callback that is called when a publish call succeeds.
     * This callback is called before the delivery complete callback.
     * @param response A success response with the published message.
     */
    void onPublishSuccessOnInstance(const MqttSuccess& response);

    /**
     * @brief Callback that is called when a publish call fails after being sent to the endpoint.
     * @param response A failure response.
     */
    void onPublishFailureOnInstance(const MqttFailure& response);

    /**
     * @brief Callback that is called when a subscribe call succeeds.
     * @param response A success response with the subscription information. The actual used qos is conveyed in this response.
     */
    void onSubscribeSuccessOnInstance(const MqttSuccess& response);

    /**
     * @brief Callback that is called when a subscribe call fails after being sent to the endpoint.
     * @param response A failure response.
     */
    void onSubscribeFailureOnInstance(const MqttFailure& response);

    /**
     * @brief Callback that is called when an unsubscribe call succeeds.
     * @param response A success response with no specific data.
     */
    void onUnsubscribeSuccessOnInstance(const MqttSuccess& response);

    /**
     * @brief Callback that is called when an unsubscribe call fails after being sent to the endpoint.
     * @param response A failure response.
     */
    void onUnsubscribeFailureOnInstance(const MqttFailure& response);

    /**
     * @brief Callback that is called when a message is received.
     * @param message The message payload and meta data.
     */
    int onMessageArrivedOnInstance(const MqttMessage& message);

    /**
     * @brief Callback that is called when the delivery of a publish message is considered complete.
     * The definition of complete depends on the quality of service used in the publish command.
     * @param token The token with the publish command is sent.
     */
    void onDeliveryCompleteOnInstance(MQTTAsync_token token);

    /**
     * @brief Callback that is called when the connection is broken.
     * @param cause The reason string. Always "cause unknown" for mqtt3 endpoints.
     */
    void onConnectionLostOnInstance(const std::string& cause);

    // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts.
    static void onConnect(void* context, char* cause);
    static void onConnectSuccess(void* context, MQTTAsync_successData* response);
    static void onConnectFailure(void* context, MQTTAsync_failureData* response);
    //static void onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode); for MQTT V5 which is not supported by centos7 paho-c
    static void onDisconnectSuccess(void* context, MQTTAsync_successData* response);
    static void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
    static void onPublishSuccess(void* context, MQTTAsync_successData* response);
    static void onPublishFailure(void* context, MQTTAsync_failureData* response);
    static void onSubscribeSuccess(void* context, MQTTAsync_successData* response);
    static void onSubscribeFailure(void* context, MQTTAsync_failureData* response);
    static void onUnsubscribeSuccess(void* context, MQTTAsync_successData* response);
    static void onUnsubscribeFailure(void* context, MQTTAsync_failureData* response);
    static int onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
    static void onDeliveryComplete(void* context, MQTTAsync_token token);
    static void onConnectionLost(void* context, char* cause);

    /**
     * @brief Connects the paho logging to the mlogic logging system.
     * This callback is registered the first time a ClientPaho instance is constructed.
     */
    static void onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message);

    mutable std::mutex m_mutex;
    std::string m_endpoint;
    std::string m_username;
    std::string m_password;
    std::string m_clientId;
    std::set<MQTTAsync_token> m_pendingOperations;
    std::map<MQTTAsync_token, bool> m_operationResult;
    mutable std::condition_variable m_operationsCompleteCV;
    std::map<std::string, Subscription> m_subscriptions;
    std::map<std::string, Subscription> m_pendingSubscriptions;
    std::map<MQTTAsync_token, std::string> m_subscribeTokenToTopic;
    std::map<MQTTAsync_token, std::string> m_unsubscribeTokenToTopic;
    std::deque<Publish> m_pendingPublishes;
    bool m_processPendingPublishes;
    mutable std::condition_variable m_pendingPublishesReadyCV;
    ::MQTTAsync m_client;
    std::atomic<ConnectionStatus> m_connectionStatus;
    std::function<void(const std::string&, ConnectionStatus)> m_connectionStatusCallback;
    std::function<void(const std::string&, std::int32_t)> m_deliveryCompleteCallback;
    MQTTAsync_token m_lastUnsubscribe; ///< centos7 workaround
    std::unique_ptr<std::promise<void>> m_connectPromise;
    std::unique_ptr<std::promise<void>> m_disconnectPromise;

    SynchronizedQueue<std::function<void()>> m_callbackEventQueue;
    std::thread m_workerThread;

    static std::atomic_int s_numberOfInstances;
};

}       // End namespace mqtt
}       // End namespace components
}       // End namespace osdev

#endif  // OSDEV_COMPONENTS_MQTT_CLIENTPAHO_H