/* **************************************************************************** * Copyright 2019 Open Systems Development BV * * * * Permission is hereby granted, free of charge, to any person obtaining a * * copy of this software and associated documentation files (the "Software"), * * to deal in the Software without restriction, including without limitation * * the rights to use, copy, modify, merge, publish, distribute, sublicense, * * and/or sell copies of the Software, and to permit persons to whom the * * Software is furnished to do so, subject to the following conditions: * * * * The above copyright notice and this permission notice shall be included in * * all copies or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * * DEALINGS IN THE SOFTWARE. * * ***************************************************************************/ #include "clientpaho.h" #include "errorcode.h" #include "mqttutil.h" #include "lockguard.h" #include "log.h" #include "metaprogrammingdefs.h" #include "mqttstream.h" #include "scopeguard.h" #include "uriparser.h" // std::chrono #include "compat-chrono.h" // std #include #include using namespace osdev::components::mqtt; namespace { #if defined(__clang__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-template" #endif OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5) template inline typename std::enable_if::value, TRet>::type initializeMqttStruct(TRet*) { return MQTTAsync_disconnectOptions_initializer; } template inline typename std::enable_if::value, TRet>::type initializeMqttStruct(TRet*) { // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE. // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this // method should not be chosen when the struct does not contain member onSuccess5! // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined. // while the struct does have an onSuccess5 member. In that case we do need correct initializer code. // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers). #ifndef MQTTAsync_disconnectOptions_initializer5 #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmissing-field-initializers" TRet ret = MQTTAsync_disconnectOptions_initializer; ret.struct_version = 1; ret.onSuccess5 = nullptr; ret.onFailure5 = nullptr; return ret; #pragma GCC diagnostic pop #else return MQTTAsync_disconnectOptions_initializer5; #endif } template struct Init { static TRet initialize() { return initializeMqttStruct(static_cast(nullptr)); } }; #if defined(__clang__) #pragma GCC diagnostic pop #endif } // namespace std::atomic_int ClientPaho::s_numberOfInstances(0); ClientPaho::ClientPaho(const std::string& _endpoint, const std::string& _id, const std::function& connectionStatusCallback, const std::function& deliveryCompleteCallback) : m_mutex() , m_endpoint() , m_username() , m_password() , m_clientId(_id) , m_pendingOperations() , m_operationResult() , m_operationsCompleteCV() , m_subscriptions() , m_pendingSubscriptions() , m_subscribeTokenToTopic() , m_unsubscribeTokenToTopic() , m_pendingPublishes() , m_processPendingPublishes(false) , m_pendingPublishesReadyCV() , m_client() , m_connectionStatus(ConnectionStatus::Disconnected) , m_connectionStatusCallback(connectionStatusCallback) , m_deliveryCompleteCallback(deliveryCompleteCallback) , m_lastUnsubscribe(-1) , m_connectPromise() , m_disconnectPromise() , m_callbackEventQueue(m_clientId) , m_workerThread() { if (0 == s_numberOfInstances++) { MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho); } LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho ") ); parseEndpoint(_endpoint); auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); if (MQTTASYNC_SUCCESS == rc) { MQTTAsync_setCallbacks(m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); } else { LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) ); } } ClientPaho::~ClientPaho() { LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - dtor ClientPao" ) ); if( MQTTAsync_isConnected( m_client ) ) { this->unsubscribeAll(); this->waitForCompletion(std::chrono::milliseconds(2000), std::set{}); this->disconnect(true, 5000); } else { // If the status was already disconnected this call does nothing setConnectionStatus(ConnectionStatus::Disconnected); } if (0 == --s_numberOfInstances) { // encountered a case where termination of the logging system within paho led to a segfault. // This was a paho thread that was cleaned while at the same time the logging system was terminated. // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less // frequently. MQTTAsync_setTraceCallback(nullptr); } MQTTAsync_destroy(&m_client); m_callbackEventQueue.stop(); if (m_workerThread.joinable()) { m_workerThread.join(); } } std::string ClientPaho::clientId() const { return m_clientId; } ConnectionStatus ClientPaho::connectionStatus() const { return m_connectionStatus; } std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if( ConnectionStatus::Disconnected != m_connectionStatus ) { return -1; } setConnectionStatus( ConnectionStatus::ConnectInProgress ); } LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) ); MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; conn_opts.keepAliveInterval = 5; conn_opts.cleansession = 1; conn_opts.onSuccess = nullptr; conn_opts.onFailure = &ClientPaho::onConnectFailure; conn_opts.context = this; conn_opts.automaticReconnect = 1; // Make sure we get a signal if the promise is fulfilled auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast(this), ClientPaho::onFirstConnect ); if( MQTTASYNC_SUCCESS == ccb ) { LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); } else { LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); } // Setup the last will and testament, if so desired. if( !lwt.topic().empty() ) { MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; will_opts.message = lwt.message().c_str(); will_opts.topicName = lwt.topic().c_str(); conn_opts.will = &will_opts; LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) ); } else { conn_opts.will = nullptr; } if( !m_username.empty() ) { conn_opts.username = m_username.c_str(); } if( !m_password.empty() ) { conn_opts.password = m_password.c_str(); } std::promise waitForConnectPromise{}; auto waitForConnect = waitForConnectPromise.get_future(); m_connectPromise.reset(); if( wait ) { m_connectPromise = std::make_unique>( std::move( waitForConnectPromise ) ); } { OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); if( !m_pendingOperations.insert( -100 ).second ) { // Write something } m_operationResult.erase( -100 ); } int rc = MQTTAsync_connect( m_client, &conn_opts ); if( MQTTASYNC_SUCCESS != rc ) { setConnectionStatus( ConnectionStatus::Disconnected ); OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } if( wait ) { waitForConnect.get(); m_connectPromise.reset(); } return -100; } std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) { ConnectionStatus currentStatus = m_connectionStatus; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus ) { return -1; } currentStatus = m_connectionStatus; setConnectionStatus( ConnectionStatus::DisconnectInProgress ); } MQTTAsync_disconnectOptions disconn_opts = Init::initialize(); disconn_opts.timeout = timeoutMs; disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess; disconn_opts.onFailure = &ClientPaho::onDisconnectFailure; disconn_opts.context = this; std::promise waitForDisconnectPromise{}; auto waitForDisconnect = waitForDisconnectPromise.get_future(); m_disconnectPromise.reset(); if( wait ) { m_disconnectPromise = std::make_unique>(std::move(waitForDisconnectPromise)); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (!m_pendingOperations.insert(-200).second) { //"ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) } m_operationResult.erase(-200); } int rc = MQTTAsync_disconnect(m_client, &disconn_opts); if( MQTTASYNC_SUCCESS != rc ) { if( MQTTASYNC_DISCONNECTED == rc ) { currentStatus = ConnectionStatus::Disconnected; } setConnectionStatus( currentStatus ); OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-200] = false; m_pendingOperations.erase(-200); if( MQTTASYNC_DISCONNECTED == rc ) { return -1; } // ("ClientPaho", std::string( "%1 - failed to disconnect, return code %2" ).arg( m_clientId ).arg( pahoAsyncErrorCodeToString(rc)) ); } if( wait ) { if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) { // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); } waitForDisconnect.get(); m_disconnectPromise.reset(); } return -200; } std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) { if( ConnectionStatus::DisconnectInProgress == m_connectionStatus ) { // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); return -1; } else if( ConnectionStatus::Disconnected == m_connectionStatus ) { // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); connect( true ); } if( !isValidTopic(message.topic() ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); } if( qos > 2 ) { qos = 2; } else if( qos < 0 ) { qos = 0; } std::unique_lock lck(m_mutex); if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) { m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); if(ConnectionStatus::ReconnectInProgress == m_connectionStatus) { LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." ); m_pendingPublishes.push_front(Publish{ qos, message }); return -1; } } return publishInternal( message, qos ); } void ClientPaho::publishPending() { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if( !m_processPendingPublishes ) { return; } } if( ConnectionStatus::Connected != m_connectionStatus ) { LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) ) } while( !m_pendingPublishes.empty() ) { const auto& pub = m_pendingPublishes.back(); publishInternal(pub.data, pub.qos); // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what()); m_pendingPublishes.pop_back(); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_processPendingPublishes = false; } m_pendingPublishesReadyCV.notify_all(); } std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function& cb ) { if( ConnectionStatus::Connected != m_connectionStatus ) { // MqttException, "Not connected" } if( !isValidTopic( topic ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); } if( qos > 2 ) { qos = 2; } else if( qos < 0 ) { qos = 0; } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto itExisting = m_subscriptions.find(topic); if( m_subscriptions.end() != itExisting ) { if( itExisting->second.qos == qos ) { return -1; } // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic); } auto itPending = m_pendingSubscriptions.find(topic); if( m_pendingSubscriptions.end() != itPending ) { if( itPending->second.qos == qos ) { auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair& item) { return topic == item.second; } ); if( m_subscribeTokenToTopic.end() != itToken ) { return itToken->first; } else { return -1; } } // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic); } std::string existingTopic{}; if( isOverlappingInternal( topic, existingTopic ) ) { // (OverlappingTopicException, "overlapping topic", existingTopic, topic); } // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic); m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb } ) ); } return subscribeInternal( topic, qos ); } void ClientPaho::resubscribe() { decltype(m_pendingSubscriptions) pendingSubscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); } for( const auto& s : pendingSubscriptions ) { subscribeInternal( s.first, s.second.qos ); } } std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) { { OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); bool found = false; for( const auto& s : m_subscriptions ) { if( topic == s.first && qos == s.second.qos ) { found = true; break; } } if( !found ) { return -1; } } MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onUnsubscribeSuccess; opts.onFailure = &ClientPaho::onUnsubscribeFailure; opts.context = this; { // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts); if( MQTTASYNC_SUCCESS != rc ) { // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); } if( !m_pendingOperations.insert( opts.token ).second ) { // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); } m_operationResult.erase( opts.token ); if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 ) { // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); } m_lastUnsubscribe = opts.token; // centos7 workaround m_unsubscribeTokenToTopic[opts.token] = topic; } // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort). this->waitForCompletion(std::chrono::seconds(1), std::set{ opts.token }); return opts.token; } void ClientPaho::unsubscribeAll() { decltype( m_subscriptions ) subscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); subscriptions = m_subscriptions; } for( const auto& s : subscriptions ) { this->unsubscribe(s.first, s.second.qos); } } std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const { if (waitFor <= std::chrono::milliseconds(0)) { return std::chrono::milliseconds(0); } std::chrono::milliseconds timeElapsed{}; { osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds) { timeElapsed = std::chrono::ceil(sinceStart); }); std::unique_lock lck(m_mutex); // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() { if (tokens.empty()) { // wait for all operations to end return m_pendingOperations.empty(); } else if (tokens.size() == 1) { return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end(); } std::vector intersect{}; std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); return intersect.empty(); } ); } return timeElapsed; } bool ClientPaho::isOverlapping(const std::string& topic) const { std::string existingTopic{}; return isOverlapping(topic, existingTopic); } bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); return isOverlappingInternal(topic, existingTopic); } std::vector ClientPaho::pendingOperations() const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::vector retval{}; retval.resize(m_pendingOperations.size()); std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin()); return retval; } bool ClientPaho::hasPendingSubscriptions() const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); return !m_pendingSubscriptions.empty(); } boost::optional ClientPaho::operationResult(std::int32_t token) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); boost::optional ret{}; auto cit = m_operationResult.find(token); if (m_operationResult.end() != cit) { ret = cit->second; } return ret; } void ClientPaho::parseEndpoint(const std::string& _endpoint) { auto ep = UriParser::parse(_endpoint); if (ep.find("user") != ep.end()) { m_username = ep["user"]; ep["user"].clear(); } if (ep.find("password") != ep.end()) { m_password = ep["password"]; ep["password"].clear(); } m_endpoint = UriParser::toString(ep); } std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onPublishSuccess; opts.onFailure = &ClientPaho::onPublishFailure; opts.context = this; auto msg = message.toAsyncMessage(); msg.qos = qos; // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts); if (MQTTASYNC_SUCCESS != rc) { // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc)); } if (!m_pendingOperations.insert(opts.token).second) { // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token); } m_operationResult.erase(opts.token); return opts.token; } std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onSubscribeSuccess; opts.onFailure = &ClientPaho::onSubscribeFailure; opts.context = this; // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts); if (MQTTASYNC_SUCCESS != rc) { m_pendingSubscriptions.erase(topic); // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); // (MqttException, "Subscription failed"); } if (!m_pendingOperations.insert(opts.token).second) { // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token); } m_operationResult.erase(opts.token); if (m_subscribeTokenToTopic.count(opts.token) > 0) { // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic); } m_subscribeTokenToTopic[opts.token] = topic; return opts.token; } void ClientPaho::setConnectionStatus( ConnectionStatus status ) { ConnectionStatus curStatus = m_connectionStatus; m_connectionStatus = status; if( status != curStatus && m_connectionStatusCallback ) { m_connectionStatusCallback( m_clientId, status ); } } bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const { existingTopic.clear(); for( const auto& s : m_pendingSubscriptions ) { if( testForOverlap( s.first, topic ) ) { existingTopic = s.first; return true; } } for( const auto& s : m_subscriptions ) { if( testForOverlap(s.first, topic ) ) { existingTopic = s.first; return true; } } return false; } void ClientPaho::pushIncomingEvent(std::function ev) { m_callbackEventQueue.push(ev); } void ClientPaho::callbackEventHandler() { LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler") ); for( ;; ) { std::vector> events; if( !m_callbackEventQueue.pop(events) ) { break; } for( const auto& ev : events ) { ev(); } } // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); } void ClientPaho::onConnectOnInstance( const std::string& cause ) { (void)cause; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); m_subscriptions.clear(); m_processPendingPublishes = true; // all publishes are on hold until publishPending is called. } setConnectionStatus(ConnectionStatus::Connected); } void ClientPaho::onConnectSuccessOnInstance() { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Register the connect callback that is used in reconnect scenarios. auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect ); if( MQTTASYNC_SUCCESS != rc ) { LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); } // For MQTTV5 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); //if (MQTTASYNC_SUCCESS != rc) { // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); //} // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = true; m_pendingOperations.erase(-100); } setConnectionStatus( ConnectionStatus::Connected ); if( m_connectPromise ) { LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!") ); m_connectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response ) { (void) response; LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")")); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } if (ConnectionStatus::ConnectInProgress == m_connectionStatus) { setConnectionStatus(ConnectionStatus::Disconnected); } m_operationsCompleteCV.notify_all(); } //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode) //{ // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); //} void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) { // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_subscriptions.clear(); m_pendingSubscriptions.clear(); m_subscribeTokenToTopic.clear(); m_unsubscribeTokenToTopic.clear(); // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations); m_operationResult[-200] = true; m_pendingOperations.clear(); } setConnectionStatus( ConnectionStatus::Disconnected ); if( m_disconnectPromise ) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) { (void) response; // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); m_operationResult[-200] = false; m_pendingOperations.erase(-200); } if( MQTTAsync_isConnected( m_client ) ) { setConnectionStatus( ConnectionStatus::Connected ); } else { setConnectionStatus( ConnectionStatus::Disconnected ); } if( m_disconnectPromise ) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) { auto pd = response.publishData(); // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = true; m_pendingOperations.erase(response.token()); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) { // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase(response.token()); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response ) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); bool operationOk = false; OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]() { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = operationOk; m_pendingOperations.erase(response.token()); }); auto it = m_subscribeTokenToTopic.find(response.token()); if (m_subscribeTokenToTopic.end() == it) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); return; } auto topic = it->second; m_subscribeTokenToTopic.erase(it); auto pendingIt = m_pendingSubscriptions.find(topic); if (m_pendingSubscriptions.end() == pendingIt) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); return; } if (response.qos() != pendingIt->second.qos) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos()); } // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic); m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second))); m_pendingSubscriptions.erase(pendingIt); operationOk = true; } void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response) { // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() { // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase(response.token()); }); auto it = m_subscribeTokenToTopic.find(response.token()); if (m_subscribeTokenToTopic.end() == it) { // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); return; } auto topic = it->second; m_subscribeTokenToTopic.erase(it); auto pendingIt = m_pendingSubscriptions.find(topic); if (m_pendingSubscriptions.end() == pendingIt) { // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); return; } // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic); m_pendingSubscriptions.erase(pendingIt); } void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. // As a workaround the last unsubscribe token is stored and is used when no valid token is available. // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled // sequentially (see ClientPaho::unsubscribe)! auto token = response.token(); if (-1 == token) { token = m_lastUnsubscribe; m_lastUnsubscribe = -1; } bool operationOk = false; OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]() { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); m_operationResult[token] = operationOk; m_pendingOperations.erase(token); }); auto it = m_unsubscribeTokenToTopic.find(token); if (m_unsubscribeTokenToTopic.end() == it) { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token); return; } auto topic = it->second; m_unsubscribeTokenToTopic.erase(it); auto registeredIt = m_subscriptions.find(topic); if (m_subscriptions.end() == registeredIt) { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token()); return; } // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic); m_subscriptions.erase(registeredIt); operationOk = true; } void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response) { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase(response.token()); }); auto it = m_unsubscribeTokenToTopic.find(response.token()); if (m_unsubscribeTokenToTopic.end() == it) { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); return; } auto topic = it->second; m_unsubscribeTokenToTopic.erase(it); } int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message) { // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate()); std::function cb; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); for (const auto& s : m_subscriptions) { if (boost::regex_match(message.topic(), s.second.topicRegex)) { cb = s.second.callback; } } } if (cb) { cb(message); } else { // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic()); } return 1; } void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token) { // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token); if (m_deliveryCompleteCallback) { m_deliveryCompleteCallback(m_clientId, static_cast(token)); } } void ClientPaho::onConnectionLostOnInstance(const std::string& cause) { (void)cause; // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); setConnectionStatus(ConnectionStatus::ReconnectInProgress); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Remove all tokens related to subscriptions from the active operations. for (const auto& p : m_subscribeTokenToTopic) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); m_pendingOperations.erase(p.first); } for (const auto& p : m_unsubscribeTokenToTopic) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); m_pendingOperations.erase(p.first); } // Clear the administration used in the subscribe process. m_subscribeTokenToTopic.clear(); m_unsubscribeTokenToTopic.clear(); } // static void ClientPaho::onFirstConnect(void* context, char* cause) { LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." ); if(context) { auto *cl = reinterpret_cast(context); std::string reason(nullptr == cause ? "Unknown cause" : cause); cl->pushIncomingEvent([cl, reason]() { cl->onConnectSuccessOnInstance(); }); } } void ClientPaho::onConnect(void* context, char* cause) { LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." ); if (context) { auto* cl = reinterpret_cast(context); std::string reason(nullptr == cause ? "unknown cause" : cause); cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); }); } } // static void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) { LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." ); if (context) { auto* cl = reinterpret_cast(context); if (!response) { // connect should always have a valid response struct. LogError( "[ClientPaho]", "onConnectSuccess - no response data"); return; } // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); cl->pushIncomingEvent([cl]() { cl->onConnectSuccessOnInstance(); }); } } // static void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) { LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" )); if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); }); } } //// static //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode) //{ // apply_unused_parameters(properties); // try { // if (context) { // auto* cl = reinterpret_cast(context); // cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); }); // } // } // catch (...) { // } // catch (const std::exception& e) { // MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what()); // } // catch (...) { // MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception"); // } //} // static void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttSuccess resp(response ? response->token : 0); cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); }); } } // static void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) { LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." ); if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); }); } } // static void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); if (!response) { // publish should always have a valid response struct. // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); } MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message)); cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); }); } } // static void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response) { (void)response; if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); }); } } // static void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); if (!response) { // subscribe should always have a valid response struct. // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); } MqttSuccess resp(response->token, response->alt.qos); cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); }); } } // static void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); }); } } // static void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttSuccess resp(response ? response->token : -1); cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); }); } } // static void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); }); } } // static int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message) { OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]() { MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); }); if (context) { auto* cl = reinterpret_cast(context); MqttMessage msg(topicName, *message); cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); }); } return 1; // always return true. Otherwise this callback is triggered again. } // static void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token) { if (context) { auto* cl = reinterpret_cast(context); cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); }); } } // static void ClientPaho::onConnectionLost(void* context, char* cause) { OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]() { if (cause) { MQTTAsync_free(cause); } }); if (context) { auto* cl = reinterpret_cast(context); std::string msg(nullptr == cause ? "cause unknown" : cause); cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); }); } } // static void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message) { (void)message; switch (level) { case MQTTASYNC_TRACE_MAXIMUM: case MQTTASYNC_TRACE_MEDIUM: case MQTTASYNC_TRACE_MINIMUM: { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_PROTOCOL: { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_ERROR: case MQTTASYNC_TRACE_SEVERE: case MQTTASYNC_TRACE_FATAL: { // ("ClientPaho", "paho - %1", message) break; } } }