#ifndef OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H #define OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H // std #include #include #include #include #include #include // osdev::components::mqtt #include "synchronizedqueue.h" #include "istatecallback.h" #include "serverstate.h" #include "imqttclient.h" namespace osdev { namespace components { namespace mqtt { // Forward definition class IMqttClientImpl; class MqttClient : public virtual IMqttClient { public: /*! * \brief Construct an instance of the MqttClient. * \param clientId The client identification used in the connection to the mqtt broker. * \param deliveryCompleteCallback Optional callback used to signal completion of a publication. */ MqttClient( const std::string& clientId, const std::function& deliveryCompleteCallback = std::function{}); virtual ~MqttClient() override; // Non copyable, non movable MqttClient(const MqttClient&) = delete; MqttClient& operator=(const MqttClient&) = delete; MqttClient(MqttClient&&) = delete; MqttClient& operator=(MqttClient&&) = delete; /** * @see IStateCallback */ virtual std::string clientId() const override; /** * @see IStateCallback */ virtual StateChangeCallbackHandle registerStateChangeCallback(const SlotStateChange& cb) override; /** * @see IStateCallback */ virtual void unregisterStateChangeCallback(StateChangeCallbackHandle handle) override; /** * @see IStateCallback */ virtual void clearAllStateChangeCallbacks() override; /** * @see IStateCallback */ virtual StateEnum state() const override; // MqttClient interface /** * @see IMqttClient */ virtual void connect(const std::string& host, int port, const Credentials& credentials) override; /** * @see IMqttClient */ virtual void connect(const std::string& endpoint) override; /** * @see IMqttClient */ virtual void disconnect() override; /** * @see IMqttClient */ virtual Token publish(const MqttMessage& message, int qos) override; /** * @see IMqttClient * When an overlapping subscription is detected a new connection has to be created. This is a synchronous action * and this method will wait for the connection to be up. */ virtual Token subscribe(const std::string& topic, int qos, const std::function& cb) override; /** * @see IMqttClient */ virtual std::set unsubscribe(const std::string& topic, int qos) override; /** * @see IMqttClient */ virtual bool waitForCompletion(std::chrono::milliseconds waitFor) const override; /** * @see IMqttClient */ virtual bool waitForCompletion(std::chrono::milliseconds waitFor, const Token& token) const override; /** * @see IMqttClient */ virtual bool waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const override; /** * @see IMqttClient */ virtual boost::optional commandResult(const Token& token) const override; /** * @see IMqttClient */ virtual std::string endpoint() const override; private: /*! * \brief Callback used to pick up the connection status of the wrappers. * \param id The client id. * \param cs The connection status. */ void connectionStatusChanged( const std::string& id, ConnectionStatus cs ); /*! * \brief Callback for handling delivery complete. * \param clientId The identifier of the client on which the publish command is executed. * \param token The token identifies the publish command. */ void deliveryComplete( const std::string& clientId, std::int32_t token ); /** * \brief Wait for commands to complete including active tokens in this client. * The interface mutex is not locked by this method. * First wait for client commands to complete (use method waitForCompletionInternalClients) * and then wait for publish delivery callbacks to complete. * \param clients - Vector with client wrapper pointers that need to be waited on. * \param[in,out] waitFor - The number of milliseconds to wait for completetion of all commands and delivery callbacks. * \param tokens - The tokens to wait for. An empty set means to wait for all commands on all clients to complete * including all publish delivery callbacks. * \return True when commands have completed in time including delivery callbacks or false if not. */ bool waitForCompletionInternal(const std::vector& clients, std::chrono::milliseconds waitFor, const std::set& tokens) const; /** * \brief Wait for commands on the wrapper clients to complete. * The interface mutex is not locked by this method. * \param clients - Vector with client wrapper pointers that need to be waited on. * \param[in,out] waitFor - The number of milliseconds to wait for completetion of all commands. * On return waitFor contains the time left. * \param tokens - The tokens to wait for. An empty set means to wait for all commands * on all clients to complete. * \return True when all commands have completed in time or false if not. */ bool waitForCompletionInternalClients(const std::vector& clients, std::chrono::milliseconds& waitFor, const std::set& tokens) const; /** * @brief Determine the state of this client based on the connection statusses of its client wrappers. * The states this client can communicate are: * Unknown : When at least one wrapper is in a different state then Connected or ReconnectInProgress or when no wrappers are available. * Good : When all wrappers are connected. * ConnectionFailure : When at least one wrapper attempts reconnection. * Unregister : When the serverstate instance is destroyed. * * The other states are about the information providers to the mqtt broker (the publishers) and we cannot say anything about them here. * The state "Good" is the exception. This state means in this case that this clients connection is ok and not that the underlying data * source (publisher) is ok. * * @param connectionStates A vector with the connection statusses of all client wrappers. */ StateEnum determineState(const std::vector& connectionStates); /** * @brief Add an event to the synchronized queue. * @param ev A function object that performs work in the context of this class. */ void pushEvent(std::function ev); /** * @brief Worker method that executes the events. */ void eventHandler(); mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive mutable std::mutex m_internalMutex; ///< Protect the internal state. std::string m_endpoint; ///< The endpoint uri. std::string m_clientId; ///< The main client identification. std::set m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens. mutable std::condition_variable m_activeTokensCV; ///< Wait on a condition to become true w.r.t. the active token set. std::function m_deliveryCompleteCallback; ///< Optional callback for publish completion. ServerState m_serverState; ///< Records the state of the connection to the broker that this client is connected to. std::unique_ptr m_principalClient; ///< The main wrapper client. std::vector> m_additionalClients; ///< A vector of additional wrapper clients. SynchronizedQueue> m_eventQueue; ///< Synchronized queue for scheduling additional work. std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads. }; } // End namespace mqtt } // End namespace components } // End namespace osdev #endif // OSDEV_COMPONENTS_MQTT_MQTTCLIENT_H