diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 24573da..6a5bf76 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -280,18 +280,19 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) return -100; } -std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) +std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) { ConnectionStatus currentStatus = m_connectionStatus; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) { + if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus ) + { return -1; } currentStatus = m_connectionStatus; - setConnectionStatus(ConnectionStatus::DisconnectInProgress); + setConnectionStatus( ConnectionStatus::DisconnectInProgress ); } MQTTAsync_disconnectOptions disconn_opts = Init::initialize(); @@ -303,7 +304,8 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) std::promise waitForDisconnectPromise{}; auto waitForDisconnect = waitForDisconnectPromise.get_future(); m_disconnectPromise.reset(); - if (wait) { + if( wait ) + { m_disconnectPromise = std::make_unique>(std::move(waitForDisconnectPromise)); } @@ -311,29 +313,29 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (!m_pendingOperations.insert(-200).second) { - // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) + //"ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) } m_operationResult.erase(-200); } int rc = MQTTAsync_disconnect(m_client, &disconn_opts); - if (MQTTASYNC_SUCCESS != rc) + if( MQTTASYNC_SUCCESS != rc ) { - if (MQTTASYNC_DISCONNECTED == rc) + if( MQTTASYNC_DISCONNECTED == rc ) { currentStatus = ConnectionStatus::Disconnected; } - setConnectionStatus(currentStatus); - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + setConnectionStatus( currentStatus ); + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-200] = false; m_pendingOperations.erase(-200); - if (MQTTASYNC_DISCONNECTED == rc) + if( MQTTASYNC_DISCONNECTED == rc ) { return -1; } - // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); + // ("ClientPaho", std::string( "%1 - failed to disconnect, return code %2" ).arg( m_clientId ).arg( pahoAsyncErrorCodeToString(rc)) ); } if( wait ) @@ -351,64 +353,62 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) { - if (ConnectionStatus::DisconnectInProgress == m_connectionStatus) + if( ConnectionStatus::DisconnectInProgress == m_connectionStatus ) { // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); return -1; } - else if (ConnectionStatus::Disconnected == m_connectionStatus) + else if( ConnectionStatus::Disconnected == m_connectionStatus ) { // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); connect( true ); } - if (!isValidTopic(message.topic())) + if( !isValidTopic(message.topic() ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); } - if (qos > 2) + if( qos > 2 ) { qos = 2; } - else if (qos < 0) + else if( qos < 0 ) { qos = 0; } - std::unique_lock lck(m_mutex); if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) - // if (ConnectionStatus::Connected != m_connectionStatus || m_processPendingPublishes) { m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); if(ConnectionStatus::ReconnectInProgress == m_connectionStatus) { - LogDebug( "ClientPaho", "Adding publish to pending queue."); + LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." ); m_pendingPublishes.push_front(Publish{ qos, message }); return -1; } } - return publishInternal(message, qos); + return publishInternal( message, qos ); } void ClientPaho::publishPending() { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (!m_processPendingPublishes) + if( !m_processPendingPublishes ) { return; } } - if (ConnectionStatus::Connected != m_connectionStatus) + if( ConnectionStatus::Connected != m_connectionStatus ) { LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) ) } - while (!m_pendingPublishes.empty()) + while( !m_pendingPublishes.empty() ) { const auto& pub = m_pendingPublishes.back(); publishInternal(pub.data, pub.qos); @@ -424,23 +424,23 @@ void ClientPaho::publishPending() m_pendingPublishesReadyCV.notify_all(); } -std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function& cb) +std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function& cb ) { - if (ConnectionStatus::Connected != m_connectionStatus) + if( ConnectionStatus::Connected != m_connectionStatus ) { // MqttException, "Not connected" } - if (!isValidTopic(topic)) + if( !isValidTopic( topic ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); } - if (qos > 2) + if( qos > 2 ) { qos = 2; } - else if (qos < 0) + else if( qos < 0 ) { qos = 0; } @@ -449,21 +449,27 @@ std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std: OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto itExisting = m_subscriptions.find(topic); - if (m_subscriptions.end() != itExisting) { - if (itExisting->second.qos == qos) { + if( m_subscriptions.end() != itExisting ) + { + if( itExisting->second.qos == qos ) + { return -1; } // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic); } auto itPending = m_pendingSubscriptions.find(topic); - if (m_pendingSubscriptions.end() != itPending) { - if (itPending->second.qos == qos) { - auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair& item) { return topic == item.second; }); - if (m_subscribeTokenToTopic.end() != itToken) { + if( m_pendingSubscriptions.end() != itPending ) + { + if( itPending->second.qos == qos ) + { + auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair& item) { return topic == item.second; } ); + if( m_subscribeTokenToTopic.end() != itToken ) + { return itToken->first; } - else { + else + { return -1; } } @@ -471,15 +477,15 @@ std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std: } std::string existingTopic{}; - if (isOverlappingInternal(topic, existingTopic)) + if( isOverlappingInternal( topic, existingTopic ) ) { // (OverlappingTopicException, "overlapping topic", existingTopic, topic); } // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic); - m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb })); + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb } ) ); } - return subscribeInternal(topic, qos); + return subscribeInternal( topic, qos ); } void ClientPaho::resubscribe() @@ -490,26 +496,26 @@ void ClientPaho::resubscribe() std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); } - for (const auto& s : pendingSubscriptions) + for( const auto& s : pendingSubscriptions ) { - subscribeInternal(s.first, s.second.qos); + subscribeInternal( s.first, s.second.qos ); } } std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) { { - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); bool found = false; - for (const auto& s : m_subscriptions) + for( const auto& s : m_subscriptions ) { - if (topic == s.first && qos == s.second.qos) + if( topic == s.first && qos == s.second.qos ) { found = true; break; } } - if (!found) + if( !found ) { return -1; } @@ -525,18 +531,18 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts); - if (MQTTASYNC_SUCCESS != rc) + if( MQTTASYNC_SUCCESS != rc ) { // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); } - if (!m_pendingOperations.insert(opts.token).second) + if( !m_pendingOperations.insert( opts.token ).second ) { // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); } - m_operationResult.erase(opts.token); - if (m_unsubscribeTokenToTopic.count(opts.token) > 0) + m_operationResult.erase( opts.token ); + if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 ) { // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); } @@ -552,13 +558,14 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) void ClientPaho::unsubscribeAll() { - decltype(m_subscriptions) subscriptions{}; + decltype( m_subscriptions ) subscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); subscriptions = m_subscriptions; } - for (const auto& s : subscriptions) { + for( const auto& s : subscriptions ) + { this->unsubscribe(s.first, s.second.qos); } } @@ -708,31 +715,31 @@ std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) return opts.token; } -void ClientPaho::setConnectionStatus(ConnectionStatus status) +void ClientPaho::setConnectionStatus( ConnectionStatus status ) { ConnectionStatus curStatus = m_connectionStatus; m_connectionStatus = status; - if (status != curStatus && m_connectionStatusCallback) + if( status != curStatus && m_connectionStatusCallback ) { - m_connectionStatusCallback(m_clientId, status); + m_connectionStatusCallback( m_clientId, status ); } } -bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const +bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const { existingTopic.clear(); - for (const auto& s : m_pendingSubscriptions) + for( const auto& s : m_pendingSubscriptions ) { - if (testForOverlap(s.first, topic)) + if( testForOverlap( s.first, topic ) ) { existingTopic = s.first; return true; } } - for (const auto& s : m_subscriptions) + for( const auto& s : m_subscriptions ) { - if (testForOverlap(s.first, topic)) + if( testForOverlap(s.first, topic ) ) { existingTopic = s.first; return true; @@ -748,26 +755,25 @@ void ClientPaho::pushIncomingEvent(std::function ev) void ClientPaho::callbackEventHandler() { - LogDebug("ClientPaho", std::string( m_clientId + " - starting callback event handler") ); - for (;;) { + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler") ); + for( ;; ) + { std::vector> events; - if (!m_callbackEventQueue.pop(events)) + if( !m_callbackEventQueue.pop(events) ) { break; } - for (const auto& ev : events) + for( const auto& ev : events ) { ev(); - // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException); } } // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); } -void ClientPaho::onConnectOnInstance(const std::string& cause) +void ClientPaho::onConnectOnInstance( const std::string& cause ) { (void)cause; - // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); @@ -783,10 +789,10 @@ void ClientPaho::onConnectSuccessOnInstance() { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Register the connect callback that is used in reconnect scenarios. - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); - if (MQTTASYNC_SUCCESS != rc) + auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect ); + if( MQTTASYNC_SUCCESS != rc ) { - LogError( "[ClientPaho]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); + LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); } // For MQTTV5 @@ -800,19 +806,19 @@ void ClientPaho::onConnectSuccessOnInstance() m_pendingOperations.erase(-100); } - setConnectionStatus(ConnectionStatus::Connected); - if(m_connectPromise) + setConnectionStatus( ConnectionStatus::Connected ); + if( m_connectPromise ) { - LogDebug( "[ClientPaho]", std::string("connectPromise still present. Resetting!") ); + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!") ); m_connectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } -void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) +void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response ) { - (void)response; - LogDebug("ClientPaho", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")")); + (void) response; + LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")")); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); @@ -846,17 +852,18 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) m_pendingOperations.clear(); } - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); - if (m_disconnectPromise) { + if( m_disconnectPromise ) + { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } -void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response) +void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) { - (void)response; + (void) response; // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); @@ -865,23 +872,23 @@ void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response) m_pendingOperations.erase(-200); } - if (MQTTAsync_isConnected(m_client)) + if( MQTTAsync_isConnected( m_client ) ) { - setConnectionStatus(ConnectionStatus::Connected); + setConnectionStatus( ConnectionStatus::Connected ); } else { - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); } - if (m_disconnectPromise) + if( m_disconnectPromise ) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } -void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) +void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) { auto pd = response.publishData(); // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); @@ -894,7 +901,7 @@ void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) m_operationsCompleteCV.notify_all(); } -void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) +void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) { // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); { @@ -906,7 +913,7 @@ void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) m_operationsCompleteCV.notify_all(); } -void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response) +void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response ) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); @@ -919,7 +926,8 @@ void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response) m_pendingOperations.erase(response.token()); }); auto it = m_subscribeTokenToTopic.find(response.token()); - if (m_subscribeTokenToTopic.end() == it) { + if (m_subscribeTokenToTopic.end() == it) + { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); return; }