diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 8964f56..47bd9b9 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -186,11 +186,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (ConnectionStatus::Disconnected != m_connectionStatus) + if( ConnectionStatus::Disconnected != m_connectionStatus ) { return -1; } - setConnectionStatus(ConnectionStatus::ConnectInProgress); + setConnectionStatus( ConnectionStatus::ConnectInProgress ); } MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; @@ -215,12 +215,12 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) } - if (!m_username.empty()) + if( !m_username.empty() ) { conn_opts.username = m_username.c_str(); } - if (!m_password.empty()) + if( !m_password.empty() ) { conn_opts.password = m_password.c_str(); } @@ -228,30 +228,30 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) std::promise waitForConnectPromise{}; auto waitForConnect = waitForConnectPromise.get_future(); m_connectPromise.reset(); - if (wait) + if( wait ) { - m_connectPromise = std::make_unique>(std::move(waitForConnectPromise)); + m_connectPromise = std::make_unique>( std::move( waitForConnectPromise ) ); } { - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (!m_pendingOperations.insert(-100).second) + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); + if( !m_pendingOperations.insert( -100 ).second ) { // Write something } - m_operationResult.erase(-100); + m_operationResult.erase( -100 ); } - int rc = MQTTAsync_connect(m_client, &conn_opts); - if (MQTTASYNC_SUCCESS != rc) + int rc = MQTTAsync_connect( m_client, &conn_opts ); + if( MQTTASYNC_SUCCESS != rc ) { - setConnectionStatus(ConnectionStatus::Disconnected); - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + setConnectionStatus( ConnectionStatus::Disconnected ); + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } - if (wait) + if( wait ) { waitForConnect.get(); m_connectPromise.reset(); @@ -306,13 +306,15 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) m_operationResult[-200] = false; m_pendingOperations.erase(-200); - if (MQTTASYNC_DISCONNECTED == rc) { + if (MQTTASYNC_DISCONNECTED == rc) + { return -1; } // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); } - if (wait) { + if( wait ) + { if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) { // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); @@ -334,6 +336,7 @@ std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) else if (ConnectionStatus::Disconnected == m_connectionStatus) { // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); + connect( true ); } if (!isValidTopic(message.topic())) @@ -368,7 +371,8 @@ void ClientPaho::publishPending() { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (!m_processPendingPublishes) { + if (!m_processPendingPublishes) + { return; } } @@ -785,9 +789,6 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (m_connectPromise) { - m_connectPromise->set_value(); - } // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = false; m_pendingOperations.erase(-100);