/* **************************************************************************** * Copyright 2019 Open Systems Development BV * * * * Permission is hereby granted, free of charge, to any person obtaining a * * copy of this software and associated documentation files (the "Software"), * * to deal in the Software without restriction, including without limitation * * the rights to use, copy, modify, merge, publish, distribute, sublicense, * * and/or sell copies of the Software, and to permit persons to whom the * * Software is furnished to do so, subject to the following conditions: * * * * The above copyright notice and this permission notice shall be included in * * all copies or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * * DEALINGS IN THE SOFTWARE. * * ***************************************************************************/ #ifndef OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H #define OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H // std #include #include #include #include #include #include // osdev::components::mqtt #include "istatecallback.h" #include "imqttclient.h" #include "mqtt_lwt.h" #include "serverstate.h" #include "synchronizedqueue.h" // osdev::components::logger #include "log.h" namespace osdev { namespace components { namespace mqtt { // Internal structure for storing subscriptions until a valid connection becomes available. class Subscription { public: Subscription( const std::string &topic, int qos, const std::function call_back ) : m_topic( topic ) , m_qos( qos ) , m_call_back(call_back ) {} std::string getTopic() const { return m_topic; } int getQoS() const { return m_qos; } std::function getCallBack() { return m_call_back; } private: std::string m_topic; int m_qos; std::function m_call_back; }; // Forward definition class IMqttClientImpl; class MqttClient : public virtual IMqttClient { public: /*! * \brief Construct an instance of the MqttClient. * \param clientId The client identification used in the connection to the mqtt broker. * \param deliveryCompleteCallback Optional callback used to signal completion of a publication. */ MqttClient( const std::string& clientId, const std::function& deliveryCompleteCallback = std::function{}); virtual ~MqttClient() override; // Non copyable, non movable MqttClient(const MqttClient&) = delete; MqttClient& operator=(const MqttClient&) = delete; MqttClient(MqttClient&&) = delete; MqttClient& operator=(MqttClient&&) = delete; /** * @see IStateCallback */ virtual std::string clientId() const override; /** * @see IStateCallback */ virtual StateChangeCallbackHandle registerStateChangeCallback(const SlotStateChange& cb) override; /** * @see IStateCallback */ virtual void unregisterStateChangeCallback(StateChangeCallbackHandle handle) override; /** * @see IStateCallback */ virtual void clearAllStateChangeCallbacks() override; /** * @see IStateCallback */ virtual StateEnum state() const override; // MqttClient interface /** * @see IMqttClient */ virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; /** * @see IMqttClient */ virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; /** * @see IMqttClient */ virtual void disconnect() override; /** * @see IMqttClient */ virtual Token publish(const MqttMessage& message, int qos) override; /** * @see IMqttClient * When an overlapping subscription is detected a new connection has to be created. This is a synchronous action * and this method will wait for the connection to be up. */ virtual Token subscribe(const std::string& topic, int qos, const std::function& cb) override; /** * @see IMqttClient */ virtual std::set unsubscribe(const std::string& topic, int qos) override; /** * @see IMqttClient */ virtual bool waitForCompletion(std::chrono::milliseconds waitFor) const override; /** * @see IMqttClient */ virtual bool waitForCompletion(std::chrono::milliseconds waitFor, const Token& token) const override; /** * @see IMqttClient */ virtual bool waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const override; /** * @see IMqttClient */ virtual boost::optional commandResult(const Token& token) const override; /** * @see IMqttClient */ virtual std::string endpoint() const override; /*! * \brief setMask update the current logMask * \param logMask - Enum defining the logmask used. */ void setMask ( osdev::components::log::LogMask logMask ); /*! * \brief setLogLevel update the current logLevel * \param logLevel - Enum defining the logLevel used, in combination with Mask. */ void setLogLevel( osdev::components::log::LogLevel logLevel ); /*! * \brief setContext update the current context * \param context - String containing the new context name. */ void setContext ( std::string context ); private: /*! * \brief Callback used to pick up the connection status of the wrappers. * \param id The client id. * \param cs The connection status. */ void connectionStatusChanged( const std::string& id, ConnectionStatus cs ); /*! * \brief Callback for handling delivery complete. * \param clientId The identifier of the client on which the publish command is executed. * \param token The token identifies the publish command. */ void deliveryComplete( const std::string& clientId, std::int32_t token ); /** * \brief Wait for commands to complete including active tokens in this client. * The interface mutex is not locked by this method. * First wait for client commands to complete (use method waitForCompletionInternalClients) * and then wait for publish delivery callbacks to complete. * \param clients - Vector with client wrapper pointers that need to be waited on. * \param[in,out] waitFor - The number of milliseconds to wait for completetion of all commands and delivery callbacks. * \param tokens - The tokens to wait for. An empty set means to wait for all commands on all clients to complete * including all publish delivery callbacks. * \return True when commands have completed in time including delivery callbacks or false if not. */ bool waitForCompletionInternal(const std::vector& clients, std::chrono::milliseconds waitFor, const std::set& tokens) const; /** * \brief Wait for commands on the wrapper clients to complete. * The interface mutex is not locked by this method. * \param clients - Vector with client wrapper pointers that need to be waited on. * \param[in,out] waitFor - The number of milliseconds to wait for completetion of all commands. * On return waitFor contains the time left. * \param tokens - The tokens to wait for. An empty set means to wait for all commands * on all clients to complete. * \return True when all commands have completed in time or false if not. */ bool waitForCompletionInternalClients(const std::vector& clients, std::chrono::milliseconds& waitFor, const std::set& tokens) const; /** * @brief Determine the state of this client based on the connection statusses of its client wrappers. * The states this client can communicate are: * Unknown : When at least one wrapper is in a different state then Connected or ReconnectInProgress or when no wrappers are available. * Good : When all wrappers are connected. * ConnectionFailure : When at least one wrapper attempts reconnection. * Unregister : When the serverstate instance is destroyed. * * The other states are about the information providers to the mqtt broker (the publishers) and we cannot say anything about them here. * The state "Good" is the exception. This state means in this case that this clients connection is ok and not that the underlying data * source (publisher) is ok. * * @param connectionStates A vector with the connection statusses of all client wrappers. */ StateEnum determineState(const std::vector& connectionStates); /** * @brief Add an event to the synchronized queue. * @param ev A function object that performs work in the context of this class. */ void pushEvent(std::function ev); /** * @brief Worker method that executes the events. */ void eventHandler(); mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive mutable std::mutex m_internalMutex; ///< Protect the internal state. mutable std::mutex m_subscriptionMutex; ///< Protect the deferred Subscription Buffer std::string m_endpoint; ///< The endpoint uri. std::string m_clientId; ///< The main client identification. std::set m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens. mutable std::condition_variable m_activeTokensCV; ///< Wait on a condition to become true w.r.t. the active token set. std::function m_deliveryCompleteCallback; ///< Optional callback for publish completion. ServerState m_serverState; ///< Records the state of the connection to the broker that this client is connected to. std::unique_ptr m_principalClient; ///< The main wrapper client. std::vector> m_additionalClients; ///< A vector of additional wrapper clients. SynchronizedQueue> m_eventQueue; ///< Synchronized queue for scheduling additional work. std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads. std::vector m_deferredSubscriptions; ///< A buffer to store subscription requests until the principal client comes online }; } // End namespace mqtt } // End namespace components } // End namespace osdev #endif // OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H