diff --git a/examples/sub/main.cpp b/examples/sub/main.cpp index c9f1850..6374d90 100644 --- a/examples/sub/main.cpp +++ b/examples/sub/main.cpp @@ -76,7 +76,8 @@ int main( int argc, char* argv[] ) { std::cout << "[OK]" << std::endl; std::cout << "Connecting to the test-broker : " << std::endl; - pSubscriber->connect( "localhost", 1883, "", "" ); + pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." ); + std::cout << "Subscribing to the test-topic....." << std::endl; pSubscriber->subscribe( "test/publisher/#" ); @@ -84,6 +85,7 @@ int main( int argc, char* argv[] ) while( 1 ) { sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. + std::cout << "."; } } else diff --git a/examples/sub/subscriber.cpp b/examples/sub/subscriber.cpp index 8354dd6..ac6d812 100644 --- a/examples/sub/subscriber.cpp +++ b/examples/sub/subscriber.cpp @@ -24,11 +24,25 @@ #include Subscriber::Subscriber( const std::string &client_id ) - : MqttSubscriberBase( client_id ) + : m_mqtt_client( client_id ) { } +void Subscriber::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message ) +{ + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) ); + std::cout << "Client state : " << m_mqtt_client.state() << std::endl; +} + +void Subscriber::subscribe( const std::string &message_topic ) +{ + m_mqtt_client.subscribe( message_topic, 1, [this](const osdev::components::mqtt::MqttMessage &message ) + { + this->receive_data( message.topic(), message.payload() ); + }); +} + void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload ) { std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl; diff --git a/examples/sub/subscriber.h b/examples/sub/subscriber.h index 8c67ef4..5f30bff 100644 --- a/examples/sub/subscriber.h +++ b/examples/sub/subscriber.h @@ -22,19 +22,29 @@ #pragma once // std +#include #include // mqtt-cpp -#include "mqttsubscriberbase.h" +#include "mqttclient.h" +#include "compat-c++14.h" -class Subscriber : public MqttSubscriberBase +class Subscriber { public: - Subscriber( const std::string &client_id ); + Subscriber(const std::string &client_id); virtual ~Subscriber() {} + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string(), + const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() ); + + void subscribe( const std::string &message_topic ); + protected: - void receive_data( const std::string &message_topic, const std::string &message_payload ); + void receive_data( const std::string &message_topic, const std::string &message_payload ); + +private: + osdev::components::mqtt::MqttClient m_mqtt_client; }; diff --git a/include/mqttclient.h b/include/mqttclient.h index 391908a..7eaa795 100644 --- a/include/mqttclient.h +++ b/include/mqttclient.h @@ -42,6 +42,26 @@ namespace osdev { namespace components { namespace mqtt { +// Internal structure for storing subscriptions until a valid connection becomes available. +class Subscription +{ +public: + Subscription( const std::string &topic, int qos, const std::function call_back ) + : m_topic( topic ) + , m_qos( qos ) + , m_call_back(call_back ) + {} + + std::string getTopic() const { return m_topic; } + int getQoS() const { return m_qos; } + std::function getCallBack() { return m_call_back; } + +private: + std::string m_topic; + int m_qos; + std::function m_call_back; +}; + // Forward definition class IMqttClientImpl; @@ -215,6 +235,7 @@ private: mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive mutable std::mutex m_internalMutex; ///< Protect the internal state. + mutable std::mutex m_subscriptionMutex; ///< Protect the deferred Subscription Buffer std::string m_endpoint; ///< The endpoint uri. std::string m_clientId; ///< The main client identification. std::set m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens. @@ -225,6 +246,7 @@ private: std::vector> m_additionalClients; ///< A vector of additional wrapper clients. SynchronizedQueue> m_eventQueue; ///< Synchronized queue for scheduling additional work. std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads. + std::vector m_deferredSubscriptions; ///< A buffer to store subscription requests until the principal client comes online }; } // End namespace mqtt diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 24e6239..1dcbf6a 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -425,12 +425,14 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std { if( ConnectionStatus::Connected != m_connectionStatus ) { - // MqttException, "Not connected" + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) ); } if( !isValidTopic( topic ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) ); + return -1; } if( qos > 2 ) @@ -477,6 +479,7 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std if( isOverlappingInternal( topic, existingTopic ) ) { // (OverlappingTopicException, "overlapping topic", existingTopic, topic); + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) ); } LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); @@ -677,7 +680,7 @@ std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos ) if( !m_pendingOperations.insert( opts.token ).second ) { - LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); + // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); } m_operationResult.erase( opts.token ); return opts.token; @@ -715,10 +718,12 @@ std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos ) void ClientPaho::setConnectionStatus( ConnectionStatus status ) { + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) ); ConnectionStatus curStatus = m_connectionStatus; m_connectionStatus = status; if( status != curStatus && m_connectionStatusCallback ) { + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) ); m_connectionStatusCallback( m_clientId, status ); } } @@ -784,6 +789,8 @@ void ClientPaho::onConnectOnInstance( const std::string& cause ) void ClientPaho::onConnectSuccessOnInstance() { + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", + std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Register the connect callback that is used in reconnect scenarios. @@ -889,7 +896,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) { auto pd = response.publishData(); - LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); + // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index ad478fb..e1454f4 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -57,6 +57,7 @@ std::string generateUniqueClientId(const std::string& clientId, std::size_t clie MqttClient::MqttClient(const std::string& _clientId, const std::function& deliveryCompleteCallback) : m_interfaceMutex() , m_internalMutex() + , m_subscriptionMutex() , m_endpoint() , m_clientId(_clientId) , m_activeTokens() @@ -67,6 +68,7 @@ MqttClient::MqttClient(const std::string& _clientId, const std::functionconnectionStatus() == ConnectionStatus::Disconnected) + if (!m_principalClient || m_principalClient->connectionStatus() != ConnectionStatus::Connected) { LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) ); - // throw (?)(MqttException, "Not connected"); + // Store the subscription in the buffer for later processing. + { + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); + m_deferredSubscriptions.emplace_back( topic, qos, cb ); + } + return Token(m_clientId, -1); } + if (!m_principalClient->isOverlapping(topic)) { client = m_principalClient.get(); @@ -263,13 +271,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi { for (const auto& c : m_additionalClients) { - if (!c->isOverlapping(topic)) { + if (!c->isOverlapping(topic)) + { client = c.get(); clientFound = true; break; } } } + if (!clientFound) { LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); @@ -282,9 +292,10 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi client = m_additionalClients.back().get(); } } + if (!clientFound) { - client->connect(true); + client->connect( true ); } return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; } @@ -296,13 +307,15 @@ std::set MqttClient::unsubscribe(const std::string& topic, int qos) std::vector clients{}; { OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) { + if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) + { LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) ); // Throw (MqttException, "Not connected"); return std::set(); } clients.push_back(m_principalClient.get()); - for (const auto& c : m_additionalClients) { + for (const auto& c : m_additionalClients) + { clients.push_back(c.get()); } } @@ -386,31 +399,54 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus std::vector connectionStates{}; { OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); - if (!m_principalClient) { - return; - } - if (m_principalClient) { + + if (m_principalClient) + { principalClient = m_principalClient.get(); clients.push_back(principalClient); connectionStates.push_back(m_principalClient->connectionStatus()); } - for (const auto& c : m_additionalClients) { + + for (const auto& c : m_additionalClients) + { clients.push_back(c.get()); connectionStates.push_back(c->connectionStatus()); } } + auto newState = determineState(connectionStates); - bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); - if (resubscribe) { + // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState); + bool resubscribe = ( StateEnum::Good == newState ); + if (resubscribe) + { + // First activate pending subscriptions + { + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex); + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) ); + while( m_deferredSubscriptions.size() > 0 ) + { + auto subscription = m_deferredSubscriptions.at( 0 ); + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() ); + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() ); + } + } + + LogDebug( "[MqttClient::connectionStatusChanged]", + std::string( m_clientId + " - Resubscribing..." ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); m_activeTokens.clear(); } - for (auto* cl : clients) { - try { + + for (auto* cl : clients) + { + try + { + LogDebug( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - Client " + cl->clientId() + " has " + std::string( cl->hasPendingSubscriptions() ? "" : "no" ) + " pending subscriptions" ) ); cl->resubscribe(); } - catch (const std::exception& e) { + catch (const std::exception& e) + { LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) ); } } @@ -420,20 +456,32 @@ void MqttClient::connectionStatusChanged(const std::string& id, ConnectionStatus // The server state change and a possible resubscription are done in the context of the MqttClient worker thread // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. this->pushEvent([this, resubscribe, clients, principalClient, newState]() { - if (resubscribe) { + if (resubscribe) + { // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. auto waitFor = std::chrono::milliseconds(1000); - if (!waitForCompletionInternalClients(clients, waitFor, std::set{})) { - if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) { + if (!waitForCompletionInternalClients(clients, waitFor, std::set{})) + { + if (std::accumulate(clients.begin(), + clients.end(), + false, + [](bool hasPending, IMqttClientImpl* client) + { + return hasPending || client->hasPendingSubscriptions(); + })) + { LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) ); } } - if (principalClient) { - try { + if (principalClient) + { + try + { principalClient->publishPending(); } - catch (const std::exception& e) { + catch (const std::exception& e) + { LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); } }