diff --git a/include/clientpaho.h b/include/clientpaho.h index 2f4fff2..bfe6e2b 100644 --- a/include/clientpaho.h +++ b/include/clientpaho.h @@ -207,7 +207,7 @@ private: * @brief Callback method that is called when a first connect succeeds. * @param reason Som extra information if there is any. */ - void onFirstConnectInstance(const std::string &reason); + void onFirstConnectOnInstance(const std::string &reason); /** * @brief Callback method that is called when a reconnect succeeds. diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index a52453f..ccede0b 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -133,8 +133,8 @@ ClientPaho::ClientPaho(const std::string& _endpoint, { MQTTAsync_setCallbacks(m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback.") ); - /* - auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast(this), ClientPaho::onConnect ); + + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast(this), ClientPaho::onFirstConnect ); if( MQTTASYNC_SUCCESS == ccb ) { LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); @@ -143,7 +143,7 @@ ClientPaho::ClientPaho(const std::string& _endpoint, { LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); } - */ + m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); } else @@ -368,10 +368,13 @@ std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) std::unique_lock lck(m_mutex); - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) { + if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) + // if (ConnectionStatus::Connected != m_connectionStatus || m_processPendingPublishes) + { m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) { - // ("ClientPaho", "Adding publish to pending queue."); + if(ConnectionStatus::ReconnectInProgress == m_connectionStatus) + { + LogDebug( "ClientPaho", "Adding publish to pending queue."); m_pendingPublishes.push_front(Publish{ qos, message }); return -1; } @@ -392,7 +395,7 @@ void ClientPaho::publishPending() if (ConnectionStatus::Connected != m_connectionStatus) { - // MqttException, "Not connected"); + LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) ) } while (!m_pendingPublishes.empty()) @@ -735,7 +738,7 @@ void ClientPaho::pushIncomingEvent(std::function ev) void ClientPaho::callbackEventHandler() { - // ("ClientPaho", "%1 - starting callback event handler", m_clientId); + LogDebug("ClientPaho", std::string( m_clientId + " - starting callback event handler") ); for (;;) { std::vector> events; if (!m_callbackEventQueue.pop(events)) @@ -752,16 +755,16 @@ void ClientPaho::callbackEventHandler() // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); } -void ClientPaho::onFirstConnectInstance(const std::string &reason) +void ClientPaho::onFirstConnectOnInstance(const std::string &reason) { (void)reason; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - // Register the connect callback that is used in reconnect scenarios. - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); + // Remove the connect callback that is used in reconnect scenarios. + auto rc = MQTTAsync_setConnected(m_client, this, nullptr ); if (MQTTASYNC_SUCCESS != rc) { - LogError( "[ClientPaho]", std::string( "onFirstConnectInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); + LogError( "[ClientPaho]", std::string( "onFirstConnectOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); } }