/* **************************************************************************** * Copyright 2019 Open Systems Development BV * * * * Permission is hereby granted, free of charge, to any person obtaining a * * copy of this software and associated documentation files (the "Software"), * * to deal in the Software without restriction, including without limitation * * the rights to use, copy, modify, merge, publish, distribute, sublicense, * * and/or sell copies of the Software, and to permit persons to whom the * * Software is furnished to do so, subject to the following conditions: * * * * The above copyright notice and this permission notice shall be included in * * all copies or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * * DEALINGS IN THE SOFTWARE. * * ***************************************************************************/ #ifndef OSDEV_COMPONENTS_MQTT_CLIENTPAHO_H #define OSDEV_COMPONENTS_MQTT_CLIENTPAHO_H // std #include #include #include #include #include #include #include #include #include #include // boost #include // paho #include // osdev::components::mqtt #include "synchronizedqueue.h" #include "imqttclientimpl.h" #include "mqttfailure.h" #include "mqttsuccess.h" #include "mqtt_lwt.h" namespace osdev { namespace components { namespace mqtt { /** * @brief Wrapper class for the paho-c library. * This implementation uses the clean session flag and recreates subscriptions on reconnect. * * The implementation allows multiple subscriptions as long as the subscriptions do not have overlap. For mqtt 3 it is not * possible to track down the subscription based on the incoming message meta information. By matching the topic against * the various topic filters a subscription can be identified (but only when there is only one subscription that matches). */ class ClientPaho : public IMqttClientImpl { public: /** * @brief Construct a ClientPaho instance. * @param endpoint The endpoint to connect to * @param id The clientId that is used in the connection. * @param connectionStatusCallback The callback on which connection status information is communicated. * @param deliveryCompleteCallback Callback that is called with the publish message tokens for messages that are delivered. * Being delivered has different meanings depending on the quality of service. */ ClientPaho(const std::string& endpoint, const std::string& id, const std::function& connectionStatusCallback, const std::function& deliveryCompleteCallback); virtual ~ClientPaho() override; // Non copyable, non movable. ClientPaho(const ClientPaho&) = delete; ClientPaho& operator=(const ClientPaho&) = delete; ClientPaho(ClientPaho&&) = delete; ClientPaho& operator=(ClientPaho&&) = delete; /** * @see IMqttClientImpl */ virtual std::string clientId() const override; /** * @see IMqttClientImpl */ virtual ConnectionStatus connectionStatus() const override; /** * @see IMqttClientImpl */ virtual std::int32_t connect(bool wait, const mqtt_LWT &lwt = mqtt_LWT() ) override; /** * @see IMqttClientImpl */ virtual std::int32_t disconnect(bool wait, int timeoutMs) override; /** * @see IMqttClientImpl */ virtual std::int32_t publish(const MqttMessage& message, int qos) override; /** * @see IMqttClientImpl */ virtual void publishPending() override; /** * @see IMqttClientImpl */ virtual std::int32_t subscribe(const std::string& topic, int qos, const std::function& cb) override; /** * @see IMqttClientImpl */ virtual void resubscribe() override; /** * @see IMqttClientImpl */ virtual std::int32_t unsubscribe(const std::string& topic, int qos) override; /** * @see IMqttClientImpl */ virtual void unsubscribeAll() override; /** * @see IMqttClientImpl */ virtual std::chrono::milliseconds waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const override; /** * @see IMqttClientImpl */ virtual bool isOverlapping(const std::string& topic) const override; /** * @see IMqttClientImpl */ virtual bool isOverlapping(const std::string& topic, std::string& existingTopic) const override; /** * @see IMqttClientImpl */ virtual std::vector pendingOperations() const override; /** * @see IMqttClientImpl */ virtual bool hasPendingSubscriptions() const override; /** * @see IMqttClientImpl */ virtual boost::optional operationResult(std::int32_t token) const override; private: void parseEndpoint(const std::string& endpoint); std::int32_t publishInternal(const MqttMessage& message, int qos); std::int32_t subscribeInternal(const std::string& topic, int qos); void setConnectionStatus(ConnectionStatus status); bool isOverlappingInternal(const std::string& topic, std::string& existingTopic) const; /** * @brief Internal struct for subscriber information. * Used to store subscriptions. */ struct Subscription { int qos; boost::regex topicRegex; std::function callback; }; /** * @brief Internal struct for publisher information. * Used to store pending publishes during reconnection. */ struct Publish { int qos; MqttMessage data; }; /** * @brief Add an incoming callback event to the synchronized queue. * @param ev A function object that calls one of the event handlers on a ClientPaho instance, but other types of actions are also possible. */ void pushIncomingEvent(std::function ev); /** * @brief Worker method that executes the events. */ void callbackEventHandler(); /** * @brief Callback method that is called when a reconnect succeeds. * @param cause The cause of the original disconnect. */ void onConnectOnInstance(const std::string& cause); /** * @brief Callback that is called when a connect call succeeds. * This callback is also called when a reconnect succeeds because the paho library reuses the initial connect command! * The connection status is set to Connected. * @param response A success response with connection data. */ void onConnectSuccessOnInstance(); /** * @brief Callback that is called when a connect call fails after being sent to the endpoint. * This callback is also called when a reconnect fails because the paho library reuses the initial connect command! * The connection status is set to Disconnected when the connection state is ConnectInProgress, othwerwise the connection status is left as is. * @param response A failure response. */ void onConnectFailureOnInstance(const MqttFailure& response); //void onDisconnectOnInstance(enum MQTTReasonCodes reasonCode); for MQTT V5 which is not supported by centos7 paho-c /** * @brief Callback that is called when a disconnect call succeeds. * The connection status is set to Disconnected. * @param response A success response with no specific data. */ void onDisconnectSuccessOnInstance(const MqttSuccess& response); /** * @brief Callback that is called when a disconnect call fails after being sent to the endpoint. * Based on the result returned by the paho library The connection status is set to Disconnected or Connected. * @param response A failure response. */ void onDisconnectFailureOnInstance(const MqttFailure& response); /** * @brief Callback that is called when a publish call succeeds. * This callback is called before the delivery complete callback. * @param response A success response with the published message. */ void onPublishSuccessOnInstance(const MqttSuccess& response); /** * @brief Callback that is called when a publish call fails after being sent to the endpoint. * @param response A failure response. */ void onPublishFailureOnInstance(const MqttFailure& response); /** * @brief Callback that is called when a subscribe call succeeds. * @param response A success response with the subscription information. The actual used qos is conveyed in this response. */ void onSubscribeSuccessOnInstance(const MqttSuccess& response); /** * @brief Callback that is called when a subscribe call fails after being sent to the endpoint. * @param response A failure response. */ void onSubscribeFailureOnInstance(const MqttFailure& response); /** * @brief Callback that is called when an unsubscribe call succeeds. * @param response A success response with no specific data. */ void onUnsubscribeSuccessOnInstance(const MqttSuccess& response); /** * @brief Callback that is called when an unsubscribe call fails after being sent to the endpoint. * @param response A failure response. */ void onUnsubscribeFailureOnInstance(const MqttFailure& response); /** * @brief Callback that is called when a message is received. * @param message The message payload and meta data. */ int onMessageArrivedOnInstance(const MqttMessage& message); /** * @brief Callback that is called when the delivery of a publish message is considered complete. * The definition of complete depends on the quality of service used in the publish command. * @param token The token with the publish command is sent. */ void onDeliveryCompleteOnInstance(MQTTAsync_token token); /** * @brief Callback that is called when the connection is broken. * @param cause The reason string. Always "cause unknown" for mqtt3 endpoints. */ void onConnectionLostOnInstance(const std::string& cause); // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts. static void onFirstConnect(void* context, char* cause); static void onConnect(void* context, char* cause); static void onConnectSuccess(void* context, MQTTAsync_successData* response); static void onConnectFailure(void* context, MQTTAsync_failureData* response); //static void onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode); for MQTT V5 which is not supported by centos7 paho-c static void onDisconnectSuccess(void* context, MQTTAsync_successData* response); static void onDisconnectFailure(void* context, MQTTAsync_failureData* response); static void onPublishSuccess(void* context, MQTTAsync_successData* response); static void onPublishFailure(void* context, MQTTAsync_failureData* response); static void onSubscribeSuccess(void* context, MQTTAsync_successData* response); static void onSubscribeFailure(void* context, MQTTAsync_failureData* response); static void onUnsubscribeSuccess(void* context, MQTTAsync_successData* response); static void onUnsubscribeFailure(void* context, MQTTAsync_failureData* response); static int onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message); static void onDeliveryComplete(void* context, MQTTAsync_token token); static void onConnectionLost(void* context, char* cause); /** * @brief Connects the paho logging to the mlogic logging system. * This callback is registered the first time a ClientPaho instance is constructed. */ static void onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message); mutable std::mutex m_mutex; std::string m_endpoint; std::string m_username; std::string m_password; std::string m_clientId; std::set m_pendingOperations; std::map m_operationResult; mutable std::condition_variable m_operationsCompleteCV; std::map m_subscriptions; std::map m_pendingSubscriptions; std::map m_subscribeTokenToTopic; std::map m_unsubscribeTokenToTopic; std::deque m_pendingPublishes; bool m_processPendingPublishes; mutable std::condition_variable m_pendingPublishesReadyCV; ::MQTTAsync m_client; std::atomic m_connectionStatus; std::function m_connectionStatusCallback; std::function m_deliveryCompleteCallback; MQTTAsync_token m_lastUnsubscribe; ///< centos7 workaround std::unique_ptr> m_connectPromise; std::unique_ptr> m_disconnectPromise; SynchronizedQueue> m_callbackEventQueue; std::thread m_workerThread; static std::atomic_int s_numberOfInstances; }; } // End namespace mqtt } // End namespace components } // End namespace osdev #endif // OSDEV_COMPONENTS_MQTT_CLIENTPAHO_H