/* **************************************************************************** * Copyright 2019 Open Systems Development BV * * * * Permission is hereby granted, free of charge, to any person obtaining a * * copy of this software and associated documentation files (the "Software"), * * to deal in the Software without restriction, including without limitation * * the rights to use, copy, modify, merge, publish, distribute, sublicense, * * and/or sell copies of the Software, and to permit persons to whom the * * Software is furnished to do so, subject to the following conditions: * * * * The above copyright notice and this permission notice shall be included in * * all copies or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * * DEALINGS IN THE SOFTWARE. * * ***************************************************************************/ #include "clientpaho.h" #include "errorcode.h" #include "mqttutil.h" #include "lockguard.h" #include "log.h" #include "metaprogrammingdefs.h" #include "mqttstream.h" #include "scopeguard.h" #include "uriparser.h" // std::chrono #include "compat-chrono.h" // std #include #include using namespace osdev::components::mqtt; namespace { #if defined(__clang__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-template" #endif OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5) template inline typename std::enable_if::value, TRet>::type initializeMqttStruct(TRet*) { return MQTTAsync_disconnectOptions_initializer; } template inline typename std::enable_if::value, TRet>::type initializeMqttStruct(TRet*) { // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE. // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this // method should not be chosen when the struct does not contain member onSuccess5! // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined. // while the struct does have an onSuccess5 member. In that case we do need correct initializer code. // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers). #ifndef MQTTAsync_disconnectOptions_initializer5 #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmissing-field-initializers" TRet ret = MQTTAsync_disconnectOptions_initializer; ret.struct_version = 1; ret.onSuccess5 = nullptr; ret.onFailure5 = nullptr; return ret; #pragma GCC diagnostic pop #else return MQTTAsync_disconnectOptions_initializer5; #endif } template struct Init { static TRet initialize() { return initializeMqttStruct(static_cast(nullptr)); } }; #if defined(__clang__) #pragma GCC diagnostic pop #endif } // namespace std::atomic_int ClientPaho::s_numberOfInstances(0); ClientPaho::ClientPaho( const std::string& _endpoint, const std::string& _id, const std::function& connectionStatusCallback, const std::function& deliveryCompleteCallback ) : m_mutex() , m_endpoint() , m_username() , m_password() , m_clientId(_id) , m_pendingOperations() , m_operationResult() , m_operationsCompleteCV() , m_subscriptions() , m_pendingSubscriptions() , m_subscribeTokenToTopic() , m_unsubscribeTokenToTopic() , m_pendingPublishes() , m_processPendingPublishes(false) , m_pendingPublishesReadyCV() , m_client() , m_connectionStatus(ConnectionStatus::Disconnected) , m_connectionStatusCallback(connectionStatusCallback) , m_deliveryCompleteCallback(deliveryCompleteCallback) , m_lastUnsubscribe(-1) , m_connectPromise() , m_disconnectPromise() , m_callbackEventQueue(m_clientId) , m_workerThread() { if( 0 == s_numberOfInstances++ ) { MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho ); } LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) ); parseEndpoint(_endpoint); auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr ); if( MQTTASYNC_SUCCESS == rc ) { MQTTAsync_setCallbacks( m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete ); m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this ); } else { LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) ); } } ClientPaho::~ClientPaho() { LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) ); if( MQTTAsync_isConnected( m_client ) ) { this->unsubscribeAll(); this->waitForCompletion( std::chrono::milliseconds(2000), std::set{} ); this->disconnect( true, 5000 ); } else { // If the status was already disconnected this call does nothing setConnectionStatus( ConnectionStatus::Disconnected ); } if( 0 == --s_numberOfInstances ) { // encountered a case where termination of the logging system within paho led to a segfault. // This was a paho thread that was cleaned while at the same time the logging system was terminated. // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less // frequently. MQTTAsync_setTraceCallback( nullptr ); } MQTTAsync_destroy( &m_client ); m_callbackEventQueue.stop(); if( m_workerThread.joinable() ) { m_workerThread.join(); } } std::string ClientPaho::clientId() const { return m_clientId; } ConnectionStatus ClientPaho::connectionStatus() const { return m_connectionStatus; } std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if( ConnectionStatus::Disconnected != m_connectionStatus ) { return -1; } setConnectionStatus( ConnectionStatus::ConnectInProgress ); } LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) ); MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; conn_opts.keepAliveInterval = 5; conn_opts.cleansession = 1; conn_opts.onSuccess = nullptr; conn_opts.onFailure = &ClientPaho::onConnectFailure; conn_opts.context = this; conn_opts.automaticReconnect = 1; // Make sure we get a signal if the promise is fulfilled auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast(this), ClientPaho::onFirstConnect ); if( MQTTASYNC_SUCCESS == ccb ) { LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); } else { LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); } // Setup the last will and testament, if so desired. if( !lwt.topic().empty() ) { MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; will_opts.message = lwt.message().c_str(); will_opts.topicName = lwt.topic().c_str(); conn_opts.will = &will_opts; LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) ); } else { conn_opts.will = nullptr; } if( !m_username.empty() ) { conn_opts.username = m_username.c_str(); } if( !m_password.empty() ) { conn_opts.password = m_password.c_str(); } std::promise waitForConnectPromise{}; auto waitForConnect = waitForConnectPromise.get_future(); m_connectPromise.reset(); if( wait ) { m_connectPromise = std::make_unique>( std::move( waitForConnectPromise ) ); } { OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); if( !m_pendingOperations.insert( -100 ).second ) { // Write something } m_operationResult.erase( -100 ); } int rc = MQTTAsync_connect( m_client, &conn_opts ); if( MQTTASYNC_SUCCESS != rc ) { setConnectionStatus( ConnectionStatus::Disconnected ); OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } if( wait ) { waitForConnect.get(); m_connectPromise.reset(); } return -100; } std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) { ConnectionStatus currentStatus = m_connectionStatus; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus ) { return -1; } currentStatus = m_connectionStatus; setConnectionStatus( ConnectionStatus::DisconnectInProgress ); } MQTTAsync_disconnectOptions disconn_opts = Init::initialize(); disconn_opts.timeout = timeoutMs; disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess; disconn_opts.onFailure = &ClientPaho::onDisconnectFailure; disconn_opts.context = this; std::promise waitForDisconnectPromise{}; auto waitForDisconnect = waitForDisconnectPromise.get_future(); m_disconnectPromise.reset(); if( wait ) { m_disconnectPromise = std::make_unique>(std::move(waitForDisconnectPromise)); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if( !m_pendingOperations.insert( -200 ).second ) { LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) ); } m_operationResult.erase( -200 ); } int rc = MQTTAsync_disconnect( m_client, &disconn_opts ); if( MQTTASYNC_SUCCESS != rc ) { if( MQTTASYNC_DISCONNECTED == rc ) { currentStatus = ConnectionStatus::Disconnected; } setConnectionStatus( currentStatus ); OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-200] = false; m_pendingOperations.erase( -200 ); if( MQTTASYNC_DISCONNECTED == rc ) { return -1; } LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) ); } if( wait ) { if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) { LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) ); } waitForDisconnect.get(); m_disconnectPromise.reset(); } return -200; } std::int32_t ClientPaho::publish( const MqttMessage& message, int qos ) { if( ConnectionStatus::DisconnectInProgress == m_connectionStatus ) { LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) ); return -1; } else if( ConnectionStatus::Disconnected == m_connectionStatus ) { LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) ); connect( true ); } if( !isValidTopic(message.topic() ) ) { LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) ); } if( qos > 2 ) { qos = 2; } else if( qos < 0 ) { qos = 0; } std::unique_lock lck(m_mutex); if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || ConnectionStatus::ConnectInProgress == m_connectionStatus || m_processPendingPublishes ) { m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || ConnectionStatus::ConnectInProgress == m_connectionStatus ) { LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." ); m_pendingPublishes.push_front( Publish{ qos, message } ); return -1; } } return publishInternal( message, qos ); } void ClientPaho::publishPending() { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if( !m_processPendingPublishes ) { return; } } if( ConnectionStatus::Connected != m_connectionStatus ) { LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) ) } while( !m_pendingPublishes.empty() ) { const auto& pub = m_pendingPublishes.back(); publishInternal( pub.data, pub.qos ); m_pendingPublishes.pop_back(); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_processPendingPublishes = false; } m_pendingPublishesReadyCV.notify_all(); } std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function& cb ) { if( ConnectionStatus::Connected != m_connectionStatus ) { LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) ); } if( !isValidTopic( topic ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) ); return -1; } if( qos > 2 ) { qos = 2; } else if( qos < 0 ) { qos = 0; } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto itExisting = m_subscriptions.find(topic); if( m_subscriptions.end() != itExisting ) { if( itExisting->second.qos == qos ) { return -1; } // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic); } auto itPending = m_pendingSubscriptions.find(topic); if( m_pendingSubscriptions.end() != itPending ) { if( itPending->second.qos == qos ) { auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair& item) { return topic == item.second; } ); if( m_subscribeTokenToTopic.end() != itToken ) { return itToken->first; } else { return -1; } } // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic); } std::string existingTopic{}; if( isOverlappingInternal( topic, existingTopic ) ) { // (OverlappingTopicException, "overlapping topic", existingTopic, topic); LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) ); } LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) ); } return subscribeInternal( topic, qos ); } void ClientPaho::resubscribe() { decltype( m_pendingSubscriptions ) pendingSubscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) ); } for( const auto& s : pendingSubscriptions ) { subscribeInternal( s.first, s.second.qos ); } } std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) { { OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); bool found = false; for( const auto& s : m_subscriptions ) { if( topic == s.first && qos == s.second.qos ) { found = true; break; } } if( !found ) { return -1; } } MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onUnsubscribeSuccess; opts.onFailure = &ClientPaho::onUnsubscribeFailure; opts.context = this; { // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts ); if( MQTTASYNC_SUCCESS != rc ) { LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } if( !m_pendingOperations.insert( opts.token ).second ) { LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) ); } m_operationResult.erase( opts.token ); if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 ) { LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) ); } m_lastUnsubscribe = opts.token; // centos7 workaround m_unsubscribeTokenToTopic[opts.token] = topic; } // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort). this->waitForCompletion(std::chrono::seconds(1), std::set{ opts.token }); return opts.token; } void ClientPaho::unsubscribeAll() { decltype( m_subscriptions ) subscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); subscriptions = m_subscriptions; } for( const auto& s : subscriptions ) { this->unsubscribe( s.first, s.second.qos ); } } std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const { if( waitFor <= std::chrono::milliseconds( 0 ) ) { return std::chrono::milliseconds( 0 ); } std::chrono::milliseconds timeElapsed{}; { osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds ) { timeElapsed = std::chrono::ceil( sinceStart ); }); std::unique_lock lck(m_mutex); // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() { if( tokens.empty() ) { // wait for all operations to end return m_pendingOperations.empty(); } else if( tokens.size() == 1 ) { return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end(); } std::vector intersect{}; std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); return intersect.empty(); } ); } return timeElapsed; } bool ClientPaho::isOverlapping( const std::string& topic ) const { std::string existingTopic{}; return isOverlapping( topic, existingTopic ); } bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); return isOverlappingInternal( topic, existingTopic ); } std::vector ClientPaho::pendingOperations() const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::vector retval{}; retval.resize(m_pendingOperations.size()); std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() ); return retval; } bool ClientPaho::hasPendingSubscriptions() const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); return !m_pendingSubscriptions.empty(); } boost::optional ClientPaho::operationResult( std::int32_t token ) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); boost::optional ret{}; auto cit = m_operationResult.find( token ); if( m_operationResult.end() != cit ) { ret = cit->second; } return ret; } void ClientPaho::parseEndpoint( const std::string& _endpoint ) { auto ep = UriParser::parse( _endpoint ); if( ep.find( "user" ) != ep.end() ) { m_username = ep["user"]; ep["user"].clear(); } if( ep.find( "password" ) != ep.end() ) { m_password = ep["password"]; ep["password"].clear(); } m_endpoint = UriParser::toString( ep ); } std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos ) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onPublishSuccess; opts.onFailure = &ClientPaho::onPublishFailure; opts.context = this; auto msg = message.toAsyncMessage(); msg.qos = qos; // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts ); if( MQTTASYNC_SUCCESS != rc ) { LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } if( !m_pendingOperations.insert( opts.token ).second ) { // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); } m_operationResult.erase( opts.token ); return opts.token; } std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos ) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onSubscribeSuccess; opts.onFailure = &ClientPaho::onSubscribeFailure; opts.context = this; // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts ); if (MQTTASYNC_SUCCESS != rc) { m_pendingSubscriptions.erase( topic ); LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } if( !m_pendingOperations.insert( opts.token ).second ) { LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) ); } m_operationResult.erase( opts.token ); if( m_subscribeTokenToTopic.count( opts.token ) > 0 ) { LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) ); } m_subscribeTokenToTopic[opts.token] = topic; return opts.token; } void ClientPaho::setConnectionStatus( ConnectionStatus status ) { LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) ); ConnectionStatus curStatus = m_connectionStatus; m_connectionStatus = status; if( status != curStatus && m_connectionStatusCallback ) { LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) ); m_connectionStatusCallback( m_clientId, status ); } } bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const { existingTopic.clear(); for( const auto& s : m_pendingSubscriptions ) { if( testForOverlap( s.first, topic ) ) { existingTopic = s.first; return true; } } for( const auto& s : m_subscriptions ) { if( testForOverlap( s.first, topic ) ) { existingTopic = s.first; return true; } } return false; } void ClientPaho::pushIncomingEvent( std::function ev ) { m_callbackEventQueue.push( ev ); } void ClientPaho::callbackEventHandler() { LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) ); for( ;; ) { std::vector> events; if( !m_callbackEventQueue.pop(events) ) { break; } for( const auto& ev : events ) { ev(); } } LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) ); } void ClientPaho::onConnectOnInstance( const std::string& cause ) { (void) cause; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) ); m_subscriptions.clear(); m_processPendingPublishes = true; // all publishes are on hold until publishPending is called. } setConnectionStatus( ConnectionStatus::Connected ); } void ClientPaho::onConnectSuccessOnInstance() { LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Register the connect callback that is used in reconnect scenarios. auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect ); if( MQTTASYNC_SUCCESS != rc ) { LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); } // For MQTTV5 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); //if (MQTTASYNC_SUCCESS != rc) { // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); //} // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = true; m_pendingOperations.erase(-100); } setConnectionStatus( ConnectionStatus::Connected ); if( m_connectPromise ) { LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) ); m_connectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response ) { (void) response; LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")")); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } if( ConnectionStatus::ConnectInProgress == m_connectionStatus ) { setConnectionStatus( ConnectionStatus::Disconnected ); } m_operationsCompleteCV.notify_all(); } //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode) //{ // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); //} void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& ) { LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_subscriptions.clear(); m_pendingSubscriptions.clear(); m_subscribeTokenToTopic.clear(); m_unsubscribeTokenToTopic.clear(); // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations); m_operationResult[-200] = true; m_pendingOperations.clear(); } setConnectionStatus( ConnectionStatus::Disconnected ); if( m_disconnectPromise ) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) { (void) response; LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); m_operationResult[-200] = false; m_pendingOperations.erase(-200); } if( MQTTAsync_isConnected( m_client ) ) { setConnectionStatus( ConnectionStatus::Connected ); } else { setConnectionStatus( ConnectionStatus::Disconnected ); } if( m_disconnectPromise ) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) { auto pd = response.publishData(); // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = true; m_pendingOperations.erase(response.token()); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) { LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase(response.token()); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response ) { LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) ); OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); bool operationOk = false; OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]() { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = operationOk; m_pendingOperations.erase( response.token() ); }); auto it = m_subscribeTokenToTopic.find( response.token() ); if( m_subscribeTokenToTopic.end() == it ) { LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; m_subscribeTokenToTopic.erase(it); auto pendingIt = m_pendingSubscriptions.find( topic ); if( m_pendingSubscriptions.end() == pendingIt ) { LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); return; } if( response.qos() != pendingIt->second.qos ) { LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) ); } LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) ); m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) ); m_pendingSubscriptions.erase( pendingIt ); operationOk = true; } void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response ) { LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() { // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase( response.token() ); }); auto it = m_subscribeTokenToTopic.find( response.token() ); if( m_subscribeTokenToTopic.end() == it ) { LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; m_subscribeTokenToTopic.erase( it ); auto pendingIt = m_pendingSubscriptions.find( topic ); if( m_pendingSubscriptions.end() == pendingIt ) { LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); return; } LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) ); m_pendingSubscriptions.erase( pendingIt ); } void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response ) { LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) ); OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. // As a workaround the last unsubscribe token is stored and is used when no valid token is available. // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled // sequentially (see ClientPaho::unsubscribe)! auto token = response.token(); if( -1 == token ) { token = m_lastUnsubscribe; m_lastUnsubscribe = -1; } bool operationOk = false; OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]() { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); m_operationResult[token] = operationOk; m_pendingOperations.erase( token ); }); auto it = m_unsubscribeTokenToTopic.find( token ); if( m_unsubscribeTokenToTopic.end() == it ) { LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) ); return; } auto topic = it->second; m_unsubscribeTokenToTopic.erase( it ); auto registeredIt = m_subscriptions.find( topic ); if( m_subscriptions.end() == registeredIt ) { LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) ); return; } LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) ); m_subscriptions.erase( registeredIt ); operationOk = true; } void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response ) { LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase( response.token() ); }); auto it = m_unsubscribeTokenToTopic.find( response.token() ); if( m_unsubscribeTokenToTopic.end() == it ) { LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; m_unsubscribeTokenToTopic.erase( it ); } int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message ) { LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) ); std::function cb; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); for( const auto& s : m_subscriptions ) { if( boost::regex_match( message.topic(), s.second.topicRegex ) ) { cb = s.second.callback; } } } if( cb ) { cb( message ); } else { LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) ); } return 1; } void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token ) { LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) ); if( m_deliveryCompleteCallback ) { m_deliveryCompleteCallback( m_clientId, static_cast( token ) ); } } void ClientPaho::onConnectionLostOnInstance( const std::string& cause ) { (void) cause; // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); setConnectionStatus( ConnectionStatus::ReconnectInProgress ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Remove all tokens related to subscriptions from the active operations. for( const auto& p : m_subscribeTokenToTopic ) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); m_pendingOperations.erase( p.first ); } for( const auto& p : m_unsubscribeTokenToTopic ) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); m_pendingOperations.erase( p.first ); } // Clear the administration used in the subscribe process. m_subscribeTokenToTopic.clear(); m_unsubscribeTokenToTopic.clear(); } // static void ClientPaho::onFirstConnect( void* context, char* cause ) { LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." ); if( context ) { auto *cl = reinterpret_cast( context ); std::string reason( nullptr == cause ? "Unknown cause" : cause ); //cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } ); cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } ); } } void ClientPaho::onConnect( void* context, char* cause ) { LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." ); if( context ) { auto* cl = reinterpret_cast( context ); std::string reason( nullptr == cause ? "unknown cause" : cause ); cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } ); } } // static void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response ) { LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." ); if( context ) { auto* cl = reinterpret_cast( context ); if( !response ) { // connect should always have a valid response struct. LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" ); return; } // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } ); } } // static void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response ) { LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) ); if( context ) { auto* cl = reinterpret_cast( context ); MqttFailure resp( response ); cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } ); } } //// static //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode) //{ // apply_unused_parameters(properties); // try { // if (context) { // auto* cl = reinterpret_cast(context); // cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); }); // } // } // catch (...) { // } // catch (const std::exception& e) { // MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what()); // } // catch (...) { // MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception"); // } //} // static void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response ) { if( context ) { auto* cl = reinterpret_cast( context ); MqttSuccess resp( response ? response->token : 0 ); cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } ); } } // static void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response ) { LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." ); if( context ) { auto* cl = reinterpret_cast( context ); MqttFailure resp( response ); cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } ); } } // static void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response ) { if( context ) { auto* cl = reinterpret_cast( context ); if( !response ) { // publish should always have a valid response struct. // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); } MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message)); cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); }); } } // static void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response ) { (void) response; if( context ) { auto* cl = reinterpret_cast( context ); MqttFailure resp( response ); cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } ); } } // static void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response ) { if( context ) { auto* cl = reinterpret_cast( context ); if( !response ) { // subscribe should always have a valid response struct. // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); } MqttSuccess resp( response->token, response->alt.qos ); cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } ); } } // static void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response ) { if( context ) { auto* cl = reinterpret_cast( context ); MqttFailure resp( response ); cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } ); } } // static void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response ) { if( context ) { auto* cl = reinterpret_cast( context ); MqttSuccess resp( response ? response->token : -1 ); cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } ); } } // static void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response ) { if( context ) { auto* cl = reinterpret_cast( context ); MqttFailure resp( response ); cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } ); } } // static int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message ) { OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]() { MQTTAsync_freeMessage( &message ); MQTTAsync_free( topicName ); }); if( context ) { auto* cl = reinterpret_cast( context ); MqttMessage msg( topicName, *message ); cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } ); } return 1; // always return true. Otherwise this callback is triggered again. } // static void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token ) { if( context ) { auto* cl = reinterpret_cast( context ); cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } ); } } // static void ClientPaho::onConnectionLost( void* context, char* cause ) { OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]() { if( cause ) { MQTTAsync_free( cause ); } }); if( context ) { auto* cl = reinterpret_cast( context ); std::string msg( nullptr == cause ? "cause unknown" : cause ); cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } ); } } // static void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message ) { (void) message; switch( level ) { case MQTTASYNC_TRACE_MAXIMUM: case MQTTASYNC_TRACE_MEDIUM: case MQTTASYNC_TRACE_MINIMUM: { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_PROTOCOL: { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_ERROR: case MQTTASYNC_TRACE_SEVERE: case MQTTASYNC_TRACE_FATAL: { // ("ClientPaho", "paho - %1", message) break; } } }