diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 6a5bf76..31026e6 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -94,10 +94,10 @@ struct Init std::atomic_int ClientPaho::s_numberOfInstances(0); -ClientPaho::ClientPaho(const std::string& _endpoint, +ClientPaho::ClientPaho( const std::string& _endpoint, const std::string& _id, - const std::function& connectionStatusCallback, - const std::function& deliveryCompleteCallback) + const std::function& connectionStatusCallback, + const std::function& deliveryCompleteCallback ) : m_mutex() , m_endpoint() , m_username() @@ -123,19 +123,19 @@ ClientPaho::ClientPaho(const std::string& _endpoint, , m_callbackEventQueue(m_clientId) , m_workerThread() { - if (0 == s_numberOfInstances++) + if( 0 == s_numberOfInstances++ ) { - MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho); + MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho ); } - LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho ") ); + LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) ); parseEndpoint(_endpoint); - auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); - if (MQTTASYNC_SUCCESS == rc) + auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr ); + if( MQTTASYNC_SUCCESS == rc ) { - MQTTAsync_setCallbacks(m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); - m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); + MQTTAsync_setCallbacks( m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete ); + m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this ); } else { @@ -145,33 +145,33 @@ ClientPaho::ClientPaho(const std::string& _endpoint, ClientPaho::~ClientPaho() { - LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - dtor ClientPao" ) ); + LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) ); if( MQTTAsync_isConnected( m_client ) ) { this->unsubscribeAll(); - this->waitForCompletion(std::chrono::milliseconds(2000), std::set{}); - this->disconnect(true, 5000); + this->waitForCompletion( std::chrono::milliseconds(2000), std::set{} ); + this->disconnect( true, 5000 ); } else { // If the status was already disconnected this call does nothing - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); } - if (0 == --s_numberOfInstances) + if( 0 == --s_numberOfInstances ) { // encountered a case where termination of the logging system within paho led to a segfault. // This was a paho thread that was cleaned while at the same time the logging system was terminated. // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less // frequently. - MQTTAsync_setTraceCallback(nullptr); + MQTTAsync_setTraceCallback( nullptr ); } - MQTTAsync_destroy(&m_client); + MQTTAsync_destroy( &m_client ); m_callbackEventQueue.stop(); - if (m_workerThread.joinable()) + if( m_workerThread.joinable() ) { m_workerThread.join(); } @@ -212,11 +212,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast(this), ClientPaho::onFirstConnect ); if( MQTTASYNC_SUCCESS == ccb ) { - LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); } else { - LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); } // Setup the last will and testament, if so desired. @@ -235,7 +235,6 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) conn_opts.will = nullptr; } - if( !m_username.empty() ) { conn_opts.username = m_username.c_str(); @@ -311,14 +310,14 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (!m_pendingOperations.insert(-200).second) + if( !m_pendingOperations.insert( -200 ).second ) { - //"ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) ); } - m_operationResult.erase(-200); + m_operationResult.erase( -200 ); } - int rc = MQTTAsync_disconnect(m_client, &disconn_opts); + int rc = MQTTAsync_disconnect( m_client, &disconn_opts ); if( MQTTASYNC_SUCCESS != rc ) { if( MQTTASYNC_DISCONNECTED == rc ) @@ -329,21 +328,20 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) setConnectionStatus( currentStatus ); OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-200] = false; - m_pendingOperations.erase(-200); + m_pendingOperations.erase( -200 ); if( MQTTASYNC_DISCONNECTED == rc ) { return -1; } - // ("ClientPaho", std::string( "%1 - failed to disconnect, return code %2" ).arg( m_clientId ).arg( pahoAsyncErrorCodeToString(rc)) ); + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) ); } if( wait ) { if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) { - // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); - + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) ); } waitForDisconnect.get(); m_disconnectPromise.reset(); @@ -351,22 +349,22 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) return -200; } -std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) +std::int32_t ClientPaho::publish( const MqttMessage& message, int qos ) { if( ConnectionStatus::DisconnectInProgress == m_connectionStatus ) { - // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); + LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) ); return -1; } else if( ConnectionStatus::Disconnected == m_connectionStatus ) { - // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) ); connect( true ); } if( !isValidTopic(message.topic() ) ) { - // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) ); } if( qos > 2 ) @@ -379,13 +377,13 @@ std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) } std::unique_lock lck(m_mutex); - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes ) { m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); - if(ConnectionStatus::ReconnectInProgress == m_connectionStatus) + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus ) { LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." ); - m_pendingPublishes.push_front(Publish{ qos, message }); + m_pendingPublishes.push_front( Publish{ qos, message } ); return -1; } } @@ -411,8 +409,7 @@ void ClientPaho::publishPending() while( !m_pendingPublishes.empty() ) { const auto& pub = m_pendingPublishes.back(); - publishInternal(pub.data, pub.qos); - // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what()); + publishInternal( pub.data, pub.qos ); m_pendingPublishes.pop_back(); } @@ -482,18 +479,18 @@ std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std // (OverlappingTopicException, "overlapping topic", existingTopic, topic); } - // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic); - m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb } ) ); + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) ); } return subscribeInternal( topic, qos ); } void ClientPaho::resubscribe() { - decltype(m_pendingSubscriptions) pendingSubscriptions{}; + decltype( m_pendingSubscriptions ) pendingSubscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); + std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) ); } for( const auto& s : pendingSubscriptions ) @@ -530,21 +527,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts); + auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts ); if( MQTTASYNC_SUCCESS != rc ) { - // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } if( !m_pendingOperations.insert( opts.token ).second ) { - // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) ); } m_operationResult.erase( opts.token ); if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 ) { - // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) ); } m_lastUnsubscribe = opts.token; // centos7 workaround m_unsubscribeTokenToTopic[opts.token] = topic; @@ -566,32 +563,34 @@ void ClientPaho::unsubscribeAll() for( const auto& s : subscriptions ) { - this->unsubscribe(s.first, s.second.qos); + this->unsubscribe( s.first, s.second.qos ); } } std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const { - if (waitFor <= std::chrono::milliseconds(0)) { - return std::chrono::milliseconds(0); + if( waitFor <= std::chrono::milliseconds( 0 ) ) + { + return std::chrono::milliseconds( 0 ); } std::chrono::milliseconds timeElapsed{}; { - osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds) + osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds ) { - timeElapsed = std::chrono::ceil(sinceStart); + timeElapsed = std::chrono::ceil( sinceStart ); }); std::unique_lock lck(m_mutex); + // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() { - if (tokens.empty()) + if( tokens.empty() ) { // wait for all operations to end return m_pendingOperations.empty(); } - else if (tokens.size() == 1) + else if( tokens.size() == 1 ) { - return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end(); + return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end(); } std::vector intersect{}; std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); @@ -601,16 +600,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond return timeElapsed; } -bool ClientPaho::isOverlapping(const std::string& topic) const +bool ClientPaho::isOverlapping( const std::string& topic ) const { std::string existingTopic{}; - return isOverlapping(topic, existingTopic); + return isOverlapping( topic, existingTopic ); } -bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const +bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - return isOverlappingInternal(topic, existingTopic); + return isOverlappingInternal( topic, existingTopic ); } std::vector ClientPaho::pendingOperations() const @@ -618,7 +617,7 @@ std::vector ClientPaho::pendingOperations() const OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::vector retval{}; retval.resize(m_pendingOperations.size()); - std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin()); + std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() ); return retval; } @@ -628,36 +627,36 @@ bool ClientPaho::hasPendingSubscriptions() const return !m_pendingSubscriptions.empty(); } -boost::optional ClientPaho::operationResult(std::int32_t token) const +boost::optional ClientPaho::operationResult( std::int32_t token ) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); boost::optional ret{}; - auto cit = m_operationResult.find(token); - if (m_operationResult.end() != cit) + auto cit = m_operationResult.find( token ); + if( m_operationResult.end() != cit ) { ret = cit->second; } return ret; } -void ClientPaho::parseEndpoint(const std::string& _endpoint) +void ClientPaho::parseEndpoint( const std::string& _endpoint ) { - auto ep = UriParser::parse(_endpoint); - if (ep.find("user") != ep.end()) + auto ep = UriParser::parse( _endpoint ); + if( ep.find( "user" ) != ep.end() ) { m_username = ep["user"]; ep["user"].clear(); } - if (ep.find("password") != ep.end()) + if( ep.find( "password" ) != ep.end() ) { m_password = ep["password"]; ep["password"].clear(); } - m_endpoint = UriParser::toString(ep); + m_endpoint = UriParser::toString( ep ); } -std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) +std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos ) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onPublishSuccess; @@ -671,20 +670,20 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts); - if (MQTTASYNC_SUCCESS != rc) + if( MQTTASYNC_SUCCESS != rc ) { - // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc)); + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } - if (!m_pendingOperations.insert(opts.token).second) + if( !m_pendingOperations.insert( opts.token ).second ) { - // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token); + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); } - m_operationResult.erase(opts.token); + m_operationResult.erase( opts.token ); return opts.token; } -std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) +std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos ) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onSubscribeSuccess; @@ -697,19 +696,18 @@ std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts); if (MQTTASYNC_SUCCESS != rc) { - m_pendingSubscriptions.erase(topic); - // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); - // (MqttException, "Subscription failed"); + m_pendingSubscriptions.erase( topic ); + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } - if (!m_pendingOperations.insert(opts.token).second) + if( !m_pendingOperations.insert( opts.token ).second ) { - // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token); + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) ); } - m_operationResult.erase(opts.token); - if (m_subscribeTokenToTopic.count(opts.token) > 0) + m_operationResult.erase( opts.token ); + if( m_subscribeTokenToTopic.count( opts.token ) > 0 ) { - // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic); + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) ); } m_subscribeTokenToTopic[opts.token] = topic; return opts.token; @@ -755,7 +753,7 @@ void ClientPaho::pushIncomingEvent(std::function ev) void ClientPaho::callbackEventHandler() { - LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler") ); + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) ); for( ;; ) { std::vector> events; @@ -773,7 +771,7 @@ void ClientPaho::callbackEventHandler() } void ClientPaho::onConnectOnInstance( const std::string& cause ) { - (void)cause; + (void) cause; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); @@ -809,7 +807,7 @@ void ClientPaho::onConnectSuccessOnInstance() setConnectionStatus( ConnectionStatus::Connected ); if( m_connectPromise ) { - LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!") ); + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) ); m_connectPromise->set_value(); } m_operationsCompleteCV.notify_all(); @@ -825,9 +823,9 @@ void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response ) m_operationResult[-100] = false; m_pendingOperations.erase(-100); } - if (ConnectionStatus::ConnectInProgress == m_connectionStatus) + if( ConnectionStatus::ConnectInProgress == m_connectionStatus ) { - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); } m_operationsCompleteCV.notify_all(); } @@ -837,9 +835,9 @@ void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response ) // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); //} -void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) +void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& ) { - // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint); + LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_subscriptions.clear(); @@ -864,9 +862,9 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) { (void) response; - // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); + LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) ); { - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); m_operationResult[-200] = false; m_pendingOperations.erase(-200); @@ -891,7 +889,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) { auto pd = response.publishData(); - // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); + LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); @@ -903,7 +901,7 @@ void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) { - // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); + LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); @@ -915,77 +913,80 @@ void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response ) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token()); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) ); + + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); bool operationOk = false; - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]() { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = operationOk; - m_pendingOperations.erase(response.token()); + m_pendingOperations.erase( response.token() ); }); - auto it = m_subscribeTokenToTopic.find(response.token()); - if (m_subscribeTokenToTopic.end() == it) + auto it = m_subscribeTokenToTopic.find( response.token() ); + if( m_subscribeTokenToTopic.end() == it ) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; m_subscribeTokenToTopic.erase(it); - auto pendingIt = m_pendingSubscriptions.find(topic); - if (m_pendingSubscriptions.end() == pendingIt) + auto pendingIt = m_pendingSubscriptions.find( topic ); + if( m_pendingSubscriptions.end() == pendingIt ) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); return; } - if (response.qos() != pendingIt->second.qos) + if( response.qos() != pendingIt->second.qos ) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos()); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) ); } - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic); - m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second))); - m_pendingSubscriptions.erase(pendingIt); + + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) ); + m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) ); + m_pendingSubscriptions.erase( pendingIt ); operationOk = true; } -void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response) +void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response ) { - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); + + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() { // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; - m_pendingOperations.erase(response.token()); + m_pendingOperations.erase( response.token() ); }); - auto it = m_subscribeTokenToTopic.find(response.token()); - if (m_subscribeTokenToTopic.end() == it) + auto it = m_subscribeTokenToTopic.find( response.token() ); + if( m_subscribeTokenToTopic.end() == it ) { - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; - m_subscribeTokenToTopic.erase(it); + m_subscribeTokenToTopic.erase( it ); - auto pendingIt = m_pendingSubscriptions.find(topic); - if (m_pendingSubscriptions.end() == pendingIt) + auto pendingIt = m_pendingSubscriptions.find( topic ); + if( m_pendingSubscriptions.end() == pendingIt ) { - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); return; } - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic); - m_pendingSubscriptions.erase(pendingIt); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) ); + m_pendingSubscriptions.erase( pendingIt ); } -void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) +void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response ) { - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) ); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. @@ -994,116 +995,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled // sequentially (see ClientPaho::unsubscribe)! auto token = response.token(); - if (-1 == token) + if( -1 == token ) { token = m_lastUnsubscribe; m_lastUnsubscribe = -1; } bool operationOk = false; - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]() { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); m_operationResult[token] = operationOk; - m_pendingOperations.erase(token); + m_pendingOperations.erase( token ); }); - auto it = m_unsubscribeTokenToTopic.find(token); - if (m_unsubscribeTokenToTopic.end() == it) + auto it = m_unsubscribeTokenToTopic.find( token ); + if( m_unsubscribeTokenToTopic.end() == it ) { - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) ); return; } auto topic = it->second; - m_unsubscribeTokenToTopic.erase(it); + m_unsubscribeTokenToTopic.erase( it ); - auto registeredIt = m_subscriptions.find(topic); - if (m_subscriptions.end() == registeredIt) { - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token()); + auto registeredIt = m_subscriptions.find( topic ); + if( m_subscriptions.end() == registeredIt ) + { + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) ); return; } - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic); - m_subscriptions.erase(registeredIt); + + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) ); + m_subscriptions.erase( registeredIt ); operationOk = true; } -void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response) +void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response ) { - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; - m_pendingOperations.erase(response.token()); + m_pendingOperations.erase( response.token() ); }); - auto it = m_unsubscribeTokenToTopic.find(response.token()); - if (m_unsubscribeTokenToTopic.end() == it) + auto it = m_unsubscribeTokenToTopic.find( response.token() ); + if( m_unsubscribeTokenToTopic.end() == it ) { - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; - m_unsubscribeTokenToTopic.erase(it); + m_unsubscribeTokenToTopic.erase( it ); } -int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message) +int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message ) { - // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate()); - - std::function cb; + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) ); + std::function cb; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - for (const auto& s : m_subscriptions) + for( const auto& s : m_subscriptions ) { - if (boost::regex_match(message.topic(), s.second.topicRegex)) + if( boost::regex_match( message.topic(), s.second.topicRegex ) ) { cb = s.second.callback; } } } - if (cb) + if( cb ) { - cb(message); + cb( message ); } else { - // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic()); + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) ); } return 1; } -void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token) +void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token ) { - // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token); - if (m_deliveryCompleteCallback) + LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) ); + if( m_deliveryCompleteCallback ) { - m_deliveryCompleteCallback(m_clientId, static_cast(token)); + m_deliveryCompleteCallback( m_clientId, static_cast( token ) ); } } -void ClientPaho::onConnectionLostOnInstance(const std::string& cause) +void ClientPaho::onConnectionLostOnInstance( const std::string& cause ) { - (void)cause; + (void) cause; // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); - setConnectionStatus(ConnectionStatus::ReconnectInProgress); + setConnectionStatus( ConnectionStatus::ReconnectInProgress ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Remove all tokens related to subscriptions from the active operations. - for (const auto& p : m_subscribeTokenToTopic) + for( const auto& p : m_subscribeTokenToTopic ) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); - m_pendingOperations.erase(p.first); + m_pendingOperations.erase( p.first ); } - for (const auto& p : m_unsubscribeTokenToTopic) + for( const auto& p : m_unsubscribeTokenToTopic ) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); - m_pendingOperations.erase(p.first); + m_pendingOperations.erase( p.first ); } // Clear the administration used in the subscribe process. m_subscribeTokenToTopic.clear(); @@ -1111,55 +1113,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string& cause) } // static -void ClientPaho::onFirstConnect(void* context, char* cause) +void ClientPaho::onFirstConnect( void* context, char* cause ) { LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." ); - if(context) + if( context ) { - auto *cl = reinterpret_cast(context); - std::string reason(nullptr == cause ? "Unknown cause" : cause); - cl->pushIncomingEvent([cl, reason]() { cl->onConnectSuccessOnInstance(); }); + auto *cl = reinterpret_cast( context ); + std::string reason( nullptr == cause ? "Unknown cause" : cause ); + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } ); } } -void ClientPaho::onConnect(void* context, char* cause) +void ClientPaho::onConnect( void* context, char* cause ) { LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." ); - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - std::string reason(nullptr == cause ? "unknown cause" : cause); - cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); }); + auto* cl = reinterpret_cast( context ); + std::string reason( nullptr == cause ? "unknown cause" : cause ); + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } ); } } // static -void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response ) { LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." ); - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - if (!response) + auto* cl = reinterpret_cast( context ); + if( !response ) { // connect should always have a valid response struct. - LogError( "[ClientPaho]", "onConnectSuccess - no response data"); + LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" ); return; } // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); - cl->pushIncomingEvent([cl]() { cl->onConnectSuccessOnInstance(); }); + cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } ); } } // static -void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response ) { - LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" )); - if (context) + LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) ); + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } ); } } @@ -1184,35 +1186,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response //} // static -void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttSuccess resp(response ? response->token : 0); - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttSuccess resp( response ? response->token : 0 ); + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } ); } } // static -void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response ) { LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." ); - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } ); } } // static -void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - if (!response) + auto* cl = reinterpret_cast( context ); + if( !response ) { // publish should always have a valid response struct. // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); @@ -1223,134 +1225,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response } // static -void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response ) { - (void)response; - if (context) + (void) response; + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } ); } } // static -void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - if (!response) + auto* cl = reinterpret_cast( context ); + if( !response ) { // subscribe should always have a valid response struct. // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); } - MqttSuccess resp(response->token, response->alt.qos); - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); }); + MqttSuccess resp( response->token, response->alt.qos ); + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } ); } } // static -void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } ); } } // static -void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttSuccess resp(response ? response->token : -1); - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttSuccess resp( response ? response->token : -1 ); + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } ); } } // static -void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } ); } } // static -int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message) +int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message ) { - OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]() + OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]() { - MQTTAsync_freeMessage(&message); - MQTTAsync_free(topicName); + MQTTAsync_freeMessage( &message ); + MQTTAsync_free( topicName ); }); - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttMessage msg(topicName, *message); - cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); }); + auto* cl = reinterpret_cast( context ); + MqttMessage msg( topicName, *message ); + cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } ); } return 1; // always return true. Otherwise this callback is triggered again. } // static -void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token) +void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); }); + auto* cl = reinterpret_cast( context ); + cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } ); } } // static -void ClientPaho::onConnectionLost(void* context, char* cause) +void ClientPaho::onConnectionLost( void* context, char* cause ) { - OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]() + OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]() { - if (cause) + if( cause ) { - MQTTAsync_free(cause); + MQTTAsync_free( cause ); } }); - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - std::string msg(nullptr == cause ? "cause unknown" : cause); - cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); }); + auto* cl = reinterpret_cast( context ); + std::string msg( nullptr == cause ? "cause unknown" : cause ); + cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } ); } } // static -void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message) +void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message ) { - (void)message; - switch (level) + (void) message; + switch( level ) { case MQTTASYNC_TRACE_MAXIMUM: case MQTTASYNC_TRACE_MEDIUM: - case MQTTASYNC_TRACE_MINIMUM: { + case MQTTASYNC_TRACE_MINIMUM: + { // ("ClientPaho", "paho - %1", message) break; } - case MQTTASYNC_TRACE_PROTOCOL: { + case MQTTASYNC_TRACE_PROTOCOL: + { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_ERROR: case MQTTASYNC_TRACE_SEVERE: - case MQTTASYNC_TRACE_FATAL: { + case MQTTASYNC_TRACE_FATAL: + { // ("ClientPaho", "paho - %1", message) break; }