Commit a670240bdae74c0515b76307249ba5fb6a34e70f

Authored by Peter M. Groen
1 parent 9421324b

Fix on connection

include/clientpaho.h
@@ -207,7 +207,7 @@ private: @@ -207,7 +207,7 @@ private:
207 * @brief Callback method that is called when a first connect succeeds. 207 * @brief Callback method that is called when a first connect succeeds.
208 * @param reason Som extra information if there is any. 208 * @param reason Som extra information if there is any.
209 */ 209 */
210 - void onFirstConnectInstance(const std::string &reason); 210 + void onFirstConnectOnInstance(const std::string &reason);
211 211
212 /** 212 /**
213 * @brief Callback method that is called when a reconnect succeeds. 213 * @brief Callback method that is called when a reconnect succeeds.
src/clientpaho.cpp
@@ -133,8 +133,8 @@ ClientPaho::ClientPaho(const std::string& _endpoint, @@ -133,8 +133,8 @@ ClientPaho::ClientPaho(const std::string& _endpoint,
133 { 133 {
134 MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); 134 MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete);
135 LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback.") ); 135 LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback.") );
136 - /*  
137 - auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnect ); 136 +
  137 + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
138 if( MQTTASYNC_SUCCESS == ccb ) 138 if( MQTTASYNC_SUCCESS == ccb )
139 { 139 {
140 LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); 140 LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
@@ -143,7 +143,7 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint, @@ -143,7 +143,7 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint,
143 { 143 {
144 LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); 144 LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
145 } 145 }
146 - */ 146 +
147 m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); 147 m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this);
148 } 148 }
149 else 149 else
@@ -368,10 +368,13 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos) @@ -368,10 +368,13 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos)
368 368
369 369
370 std::unique_lock<std::mutex> lck(m_mutex); 370 std::unique_lock<std::mutex> lck(m_mutex);
371 - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) { 371 + if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes)
  372 + // if (ConnectionStatus::Connected != m_connectionStatus || m_processPendingPublishes)
  373 + {
372 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); 374 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
373 - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) {  
374 - // ("ClientPaho", "Adding publish to pending queue."); 375 + if(ConnectionStatus::ReconnectInProgress == m_connectionStatus)
  376 + {
  377 + LogDebug( "ClientPaho", "Adding publish to pending queue.");
375 m_pendingPublishes.push_front(Publish{ qos, message }); 378 m_pendingPublishes.push_front(Publish{ qos, message });
376 return -1; 379 return -1;
377 } 380 }
@@ -392,7 +395,7 @@ void ClientPaho::publishPending() @@ -392,7 +395,7 @@ void ClientPaho::publishPending()
392 395
393 if (ConnectionStatus::Connected != m_connectionStatus) 396 if (ConnectionStatus::Connected != m_connectionStatus)
394 { 397 {
395 - // MqttException, "Not connected"); 398 + LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
396 } 399 }
397 400
398 while (!m_pendingPublishes.empty()) 401 while (!m_pendingPublishes.empty())
@@ -735,7 +738,7 @@ void ClientPaho::pushIncomingEvent(std::function&lt;void()&gt; ev) @@ -735,7 +738,7 @@ void ClientPaho::pushIncomingEvent(std::function&lt;void()&gt; ev)
735 738
736 void ClientPaho::callbackEventHandler() 739 void ClientPaho::callbackEventHandler()
737 { 740 {
738 - // ("ClientPaho", "%1 - starting callback event handler", m_clientId); 741 + LogDebug("ClientPaho", std::string( m_clientId + " - starting callback event handler") );
739 for (;;) { 742 for (;;) {
740 std::vector<std::function<void()>> events; 743 std::vector<std::function<void()>> events;
741 if (!m_callbackEventQueue.pop(events)) 744 if (!m_callbackEventQueue.pop(events))
@@ -752,16 +755,16 @@ void ClientPaho::callbackEventHandler() @@ -752,16 +755,16 @@ void ClientPaho::callbackEventHandler()
752 // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); 755 // ("ClientPaho", "%1 - leaving callback event handler", m_clientId);
753 } 756 }
754 757
755 -void ClientPaho::onFirstConnectInstance(const std::string &reason) 758 +void ClientPaho::onFirstConnectOnInstance(const std::string &reason)
756 { 759 {
757 (void)reason; 760 (void)reason;
758 { 761 {
759 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 762 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
760 - // Register the connect callback that is used in reconnect scenarios.  
761 - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); 763 + // Remove the connect callback that is used in reconnect scenarios.
  764 + auto rc = MQTTAsync_setConnected(m_client, this, nullptr );
762 if (MQTTASYNC_SUCCESS != rc) 765 if (MQTTASYNC_SUCCESS != rc)
763 { 766 {
764 - LogError( "[ClientPaho]", std::string( "onFirstConnectInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); 767 + LogError( "[ClientPaho]", std::string( "onFirstConnectOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
765 } 768 }
766 } 769 }
767 770