/* Copyright (C) 2019 * * This file is part of the osdev components suite * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2, or (at your option) any * later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA */ #include "clientpaho.h" #include "errorcode.h" #include "mqttutil.h" #include "lockguard.h" #include "metaprogrammingdefs.h" #include "mqttstream.h" #include "scopeguard.h" #include "uriparser.h" // std::chrono #include "compat-chrono.h" // std #include #include using namespace osdev::components::mqtt; namespace { #if defined(__clang__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-template" #endif OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5) template inline typename std::enable_if::value, TRet>::type initializeMqttStruct(TRet*) { return MQTTAsync_disconnectOptions_initializer; } template inline typename std::enable_if::value, TRet>::type initializeMqttStruct(TRet*) { // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE. // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this // method should not be chosen when the struct does not contain member onSuccess5! // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined. // while the struct does have an onSuccess5 member. In that case we do need correct initializer code. // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers). #ifndef MQTTAsync_disconnectOptions_initializer5 #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmissing-field-initializers" TRet ret = MQTTAsync_disconnectOptions_initializer; ret.struct_version = 1; ret.onSuccess5 = nullptr; ret.onFailure5 = nullptr; return ret; #pragma GCC diagnostic pop #else return MQTTAsync_disconnectOptions_initializer5; #endif } template struct Init { static TRet initialize() { return initializeMqttStruct(static_cast(nullptr)); } }; #if defined(__clang__) #pragma GCC diagnostic pop #endif } // namespace std::atomic_int ClientPaho::s_numberOfInstances(0); ClientPaho::ClientPaho(const std::string& _endpoint, const std::string& _id, const std::function& connectionStatusCallback, const std::function& deliveryCompleteCallback) : m_mutex() , m_endpoint() , m_username() , m_password() , m_clientId(_id) , m_pendingOperations() , m_operationResult() , m_operationsCompleteCV() , m_subscriptions() , m_pendingSubscriptions() , m_subscribeTokenToTopic() , m_unsubscribeTokenToTopic() , m_pendingPublishes() , m_processPendingPublishes(false) , m_pendingPublishesReadyCV() , m_client() , m_connectionStatus(ConnectionStatus::Disconnected) , m_connectionStatusCallback(connectionStatusCallback) , m_deliveryCompleteCallback(deliveryCompleteCallback) , m_lastUnsubscribe(-1) , m_connectPromise() , m_disconnectPromise() , m_callbackEventQueue(m_clientId) , m_workerThread() { if (0 == s_numberOfInstances++) { MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho); } // MLOGIC_COMMON_DEBUG("ClientPaho", "%1 - ctor ClientPaho %2", m_clientId, this); parseEndpoint(_endpoint); auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); if (MQTTASYNC_SUCCESS == rc) { MQTTAsync_setCallbacks(m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); } else { // Do something sensible here. } } ClientPaho::~ClientPaho() { if( MQTTAsync_isConnected( m_client ) ) { this->unsubscribeAll(); this->waitForCompletion(std::chrono::milliseconds(2000), std::set{}); this->disconnect(true, 5000); } else { // If the status was already disconnected this call does nothing setConnectionStatus(ConnectionStatus::Disconnected); } if (0 == --s_numberOfInstances) { // encountered a case where termination of the logging system within paho led to a segfault. // This was a paho thread that was cleaned while at the same time the logging system was terminated. // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less // frequently. MQTTAsync_setTraceCallback(nullptr); } MQTTAsync_destroy(&m_client); m_callbackEventQueue.stop(); if (m_workerThread.joinable()) { m_workerThread.join(); } } std::string ClientPaho::clientId() const { return m_clientId; } ConnectionStatus ClientPaho::connectionStatus() const { return m_connectionStatus; } std::int32_t ClientPaho::connect(bool wait) { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (ConnectionStatus::Disconnected != m_connectionStatus) { return -1; } setConnectionStatus(ConnectionStatus::ConnectInProgress); } MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = &ClientPaho::onConnectSuccess; conn_opts.onFailure = &ClientPaho::onConnectFailure; conn_opts.context = this; conn_opts.automaticReconnect = 1; if (!m_username.empty()) { conn_opts.username = m_username.c_str(); } if (!m_password.empty()) { conn_opts.password = m_password.c_str(); } std::promise waitForConnectPromise{}; auto waitForConnect = waitForConnectPromise.get_future(); m_connectPromise.reset(); if (wait) { m_connectPromise = std::make_unique>(std::move(waitForConnectPromise)); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (!m_pendingOperations.insert(-100).second) { // Write something } m_operationResult.erase(-100); } int rc = MQTTAsync_connect(m_client, &conn_opts); if (MQTTASYNC_SUCCESS != rc) { setConnectionStatus(ConnectionStatus::Disconnected); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } if (wait) { waitForConnect.get(); m_connectPromise.reset(); } return -100; } std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) { ConnectionStatus currentStatus = m_connectionStatus; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) { return -1; } currentStatus = m_connectionStatus; setConnectionStatus(ConnectionStatus::DisconnectInProgress); } MQTTAsync_disconnectOptions disconn_opts = Init::initialize(); disconn_opts.timeout = timeoutMs; disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess; disconn_opts.onFailure = &ClientPaho::onDisconnectFailure; disconn_opts.context = this; std::promise waitForDisconnectPromise{}; auto waitForDisconnect = waitForDisconnectPromise.get_future(); m_disconnectPromise.reset(); if (wait) { m_disconnectPromise = std::make_unique>(std::move(waitForDisconnectPromise)); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (!m_pendingOperations.insert(-200).second) { // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) } m_operationResult.erase(-200); } int rc = MQTTAsync_disconnect(m_client, &disconn_opts); if (MQTTASYNC_SUCCESS != rc) { if (MQTTASYNC_DISCONNECTED == rc) { currentStatus = ConnectionStatus::Disconnected; } setConnectionStatus(currentStatus); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_operationResult[-200] = false; m_pendingOperations.erase(-200); if (MQTTASYNC_DISCONNECTED == rc) { return -1; } // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); } if (wait) { if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) { // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); } waitForDisconnect.get(); m_disconnectPromise.reset(); } return -200; } std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) { if (ConnectionStatus::DisconnectInProgress == m_connectionStatus) { // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); return -1; } else if (ConnectionStatus::Disconnected == m_connectionStatus) { // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); } if (!isValidTopic(message.topic())) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); } if (qos > 2) { qos = 2; } else if (qos < 0) { qos = 0; } std::unique_lock lck(m_mutex); if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) { m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) { // ("ClientPaho", "Adding publish to pending queue."); m_pendingPublishes.push_front(Publish{ qos, message }); return -1; } } return publishInternal(message, qos); } void ClientPaho::publishPending() { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (!m_processPendingPublishes) { return; } } if (ConnectionStatus::Connected != m_connectionStatus) { // MqttException, "Not connected"); } while (!m_pendingPublishes.empty()) { const auto& pub = m_pendingPublishes.back(); publishInternal(pub.data, pub.qos); // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what()); m_pendingPublishes.pop_back(); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_processPendingPublishes = false; } m_pendingPublishesReadyCV.notify_all(); } std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function& cb) { if (ConnectionStatus::Connected != m_connectionStatus) { // MqttException, "Not connected" } if (!isValidTopic(topic)) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); } if (qos > 2) { qos = 2; } else if (qos < 0) { qos = 0; } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto itExisting = m_subscriptions.find(topic); if (m_subscriptions.end() != itExisting) { if (itExisting->second.qos == qos) { return -1; } // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic); } auto itPending = m_pendingSubscriptions.find(topic); if (m_pendingSubscriptions.end() != itPending) { if (itPending->second.qos == qos) { auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair& item) { return topic == item.second; }); if (m_subscribeTokenToTopic.end() != itToken) { return itToken->first; } else { return -1; } } // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic); } std::string existingTopic{}; if (isOverlappingInternal(topic, existingTopic)) { // (OverlappingTopicException, "overlapping topic", existingTopic, topic); } // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic); m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb })); } return subscribeInternal(topic, qos); } void ClientPaho::resubscribe() { decltype(m_pendingSubscriptions) pendingSubscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); } for (const auto& s : pendingSubscriptions) { subscribeInternal(s.first, s.second.qos); } } std::int32_t ClientPaho::unsubscribe(const std::string& topic, int qos) { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); bool found = false; for (const auto& s : m_subscriptions) { if (topic == s.first && qos == s.second.qos) { found = true; break; } } if (!found) { return -1; } } MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onUnsubscribeSuccess; opts.onFailure = &ClientPaho::onUnsubscribeFailure; opts.context = this; { // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts); if (MQTTASYNC_SUCCESS != rc) { // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); } if (!m_pendingOperations.insert(opts.token).second) { // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); } m_operationResult.erase(opts.token); if (m_unsubscribeTokenToTopic.count(opts.token) > 0) { // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); } m_lastUnsubscribe = opts.token; // centos7 workaround m_unsubscribeTokenToTopic[opts.token] = topic; } // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort). this->waitForCompletion(std::chrono::seconds(1), std::set{ opts.token }); return opts.token; } void ClientPaho::unsubscribeAll() { decltype(m_subscriptions) subscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); subscriptions = m_subscriptions; } for (const auto& s : subscriptions) { this->unsubscribe(s.first, s.second.qos); } } std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const { if (waitFor <= std::chrono::milliseconds(0)) { return std::chrono::milliseconds(0); } std::chrono::milliseconds timeElapsed{}; { osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds) { timeElapsed = std::chrono::ceil(sinceStart); }); std::unique_lock lck(m_mutex); // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() { if (tokens.empty()) { // wait for all operations to end return m_pendingOperations.empty(); } else if (tokens.size() == 1) { return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end(); } std::vector intersect{}; std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); return intersect.empty(); } ); } return timeElapsed; } bool ClientPaho::isOverlapping(const std::string& topic) const { std::string existingTopic{}; return isOverlapping(topic, existingTopic); } bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); return isOverlappingInternal(topic, existingTopic); } std::vector ClientPaho::pendingOperations() const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::vector retval{}; retval.resize(m_pendingOperations.size()); std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin()); return retval; } bool ClientPaho::hasPendingSubscriptions() const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); return !m_pendingSubscriptions.empty(); } boost::optional ClientPaho::operationResult(std::int32_t token) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); boost::optional ret{}; auto cit = m_operationResult.find(token); if (m_operationResult.end() != cit) { ret = cit->second; } return ret; } void ClientPaho::parseEndpoint(const std::string& _endpoint) { auto ep = UriParser::parse(_endpoint); if (ep.find("user") != ep.end()) { m_username = ep["user"]; ep["user"].clear(); } if (ep.find("password") != ep.end()) { m_password = ep["password"]; ep["password"].clear(); } m_endpoint = UriParser::toString(ep); } std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onPublishSuccess; opts.onFailure = &ClientPaho::onPublishFailure; opts.context = this; auto msg = message.toAsyncMessage(); msg.qos = qos; // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts); if (MQTTASYNC_SUCCESS != rc) { // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc)); } if (!m_pendingOperations.insert(opts.token).second) { // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token); } m_operationResult.erase(opts.token); return opts.token; } std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onSubscribeSuccess; opts.onFailure = &ClientPaho::onSubscribeFailure; opts.context = this; // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts); if (MQTTASYNC_SUCCESS != rc) { m_pendingSubscriptions.erase(topic); // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); // (MqttException, "Subscription failed"); } if (!m_pendingOperations.insert(opts.token).second) { // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token); } m_operationResult.erase(opts.token); if (m_subscribeTokenToTopic.count(opts.token) > 0) { // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic); } m_subscribeTokenToTopic[opts.token] = topic; return opts.token; } void ClientPaho::setConnectionStatus(ConnectionStatus status) { ConnectionStatus curStatus = m_connectionStatus; m_connectionStatus = status; if (status != curStatus && m_connectionStatusCallback) { m_connectionStatusCallback(m_clientId, status); } } bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const { existingTopic.clear(); for (const auto& s : m_pendingSubscriptions) { if (testForOverlap(s.first, topic)) { existingTopic = s.first; return true; } } for (const auto& s : m_subscriptions) { if (testForOverlap(s.first, topic)) { existingTopic = s.first; return true; } } return false; } void ClientPaho::pushIncomingEvent(std::function ev) { m_callbackEventQueue.push(ev); } void ClientPaho::callbackEventHandler() { // ("ClientPaho", "%1 - starting callback event handler", m_clientId); for (;;) { std::vector> events; if (!m_callbackEventQueue.pop(events)) { break; } for (const auto& ev : events) { ev(); // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException); } } // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); } void ClientPaho::onConnectOnInstance(const std::string& cause) { (void)cause; // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); m_subscriptions.clear(); m_processPendingPublishes = true; // all publishes are on hold until publishPending is called. } setConnectionStatus(ConnectionStatus::Connected); } void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) { auto connectData = response.connectionData(); // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)", // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Register the connect callback that is used in reconnect scenarios. auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); if (MQTTASYNC_SUCCESS != rc) { // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); } // For MQTTV5 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); //if (MQTTASYNC_SUCCESS != rc) { // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); //} // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = true; m_pendingOperations.erase(-100); } setConnectionStatus(ConnectionStatus::Connected); if (m_connectPromise) { m_connectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) { (void)response; // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); if (m_connectPromise) { m_connectPromise->set_value(); } // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } if (ConnectionStatus::ConnectInProgress == m_connectionStatus) { setConnectionStatus(ConnectionStatus::Disconnected); } m_operationsCompleteCV.notify_all(); } //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode) //{ // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); //} void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) { // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_subscriptions.clear(); m_pendingSubscriptions.clear(); m_subscribeTokenToTopic.clear(); m_unsubscribeTokenToTopic.clear(); // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations); m_operationResult[-200] = true; m_pendingOperations.clear(); } setConnectionStatus(ConnectionStatus::Disconnected); if (m_disconnectPromise) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response) { (void)response; // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); m_operationResult[-200] = false; m_pendingOperations.erase(-200); } if (MQTTAsync_isConnected(m_client)) { setConnectionStatus(ConnectionStatus::Connected); } else { setConnectionStatus(ConnectionStatus::Disconnected); } if (m_disconnectPromise) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) { auto pd = response.publishData(); // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = true; m_pendingOperations.erase(response.token()); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) { // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase(response.token()); } m_operationsCompleteCV.notify_all(); } void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); bool operationOk = false; OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]() { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = operationOk; m_pendingOperations.erase(response.token()); }); auto it = m_subscribeTokenToTopic.find(response.token()); if (m_subscribeTokenToTopic.end() == it) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); return; } auto topic = it->second; m_subscribeTokenToTopic.erase(it); auto pendingIt = m_pendingSubscriptions.find(topic); if (m_pendingSubscriptions.end() == pendingIt) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); return; } if (response.qos() != pendingIt->second.qos) { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos()); } // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic); m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second))); m_pendingSubscriptions.erase(pendingIt); operationOk = true; } void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response) { // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() { // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase(response.token()); }); auto it = m_subscribeTokenToTopic.find(response.token()); if (m_subscribeTokenToTopic.end() == it) { // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); return; } auto topic = it->second; m_subscribeTokenToTopic.erase(it); auto pendingIt = m_pendingSubscriptions.find(topic); if (m_pendingSubscriptions.end() == pendingIt) { // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); return; } // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic); m_pendingSubscriptions.erase(pendingIt); } void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. // As a workaround the last unsubscribe token is stored and is used when no valid token is available. // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled // sequentially (see ClientPaho::unsubscribe)! auto token = response.token(); if (-1 == token) { token = m_lastUnsubscribe; m_lastUnsubscribe = -1; } bool operationOk = false; OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]() { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); m_operationResult[token] = operationOk; m_pendingOperations.erase(token); }); auto it = m_unsubscribeTokenToTopic.find(token); if (m_unsubscribeTokenToTopic.end() == it) { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token); return; } auto topic = it->second; m_unsubscribeTokenToTopic.erase(it); auto registeredIt = m_subscriptions.find(topic); if (m_subscriptions.end() == registeredIt) { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token()); return; } // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic); m_subscriptions.erase(registeredIt); operationOk = true; } void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response) { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; m_pendingOperations.erase(response.token()); }); auto it = m_unsubscribeTokenToTopic.find(response.token()); if (m_unsubscribeTokenToTopic.end() == it) { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); return; } auto topic = it->second; m_unsubscribeTokenToTopic.erase(it); } int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message) { // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate()); std::function cb; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); for (const auto& s : m_subscriptions) { if (boost::regex_match(message.topic(), s.second.topicRegex)) { cb = s.second.callback; } } } if (cb) { cb(message); } else { // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic()); } return 1; } void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token) { // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token); if (m_deliveryCompleteCallback) { m_deliveryCompleteCallback(m_clientId, static_cast(token)); } } void ClientPaho::onConnectionLostOnInstance(const std::string& cause) { (void)cause; // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); setConnectionStatus(ConnectionStatus::ReconnectInProgress); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Remove all tokens related to subscriptions from the active operations. for (const auto& p : m_subscribeTokenToTopic) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); m_pendingOperations.erase(p.first); } for (const auto& p : m_unsubscribeTokenToTopic) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); m_pendingOperations.erase(p.first); } // Clear the administration used in the subscribe process. m_subscribeTokenToTopic.clear(); m_unsubscribeTokenToTopic.clear(); } // static void ClientPaho::onConnect(void* context, char* cause) { if (context) { auto* cl = reinterpret_cast(context); std::string reason(nullptr == cause ? "unknown cause" : cause); cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); }); } } // static void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); if (!response) { // connect should always have a valid response struct. // ("ClientPaho", "onConnectSuccess - no response data"); } MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); cl->pushIncomingEvent([cl, resp]() { cl->onConnectSuccessOnInstance(resp); }); } } // static void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); }); } } //// static //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode) //{ // apply_unused_parameters(properties); // try { // if (context) { // auto* cl = reinterpret_cast(context); // cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); }); // } // } // catch (...) { // } // catch (const std::exception& e) { // MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what()); // } // catch (...) { // MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception"); // } //} // static void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttSuccess resp(response ? response->token : 0); cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); }); } } // static void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); }); } } // static void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); if (!response) { // publish should always have a valid response struct. // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); } MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message)); cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); }); } } // static void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response) { (void)response; if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); }); } } // static void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); if (!response) { // subscribe should always have a valid response struct. // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); } MqttSuccess resp(response->token, response->alt.qos); cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); }); } } // static void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); }); } } // static void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttSuccess resp(response ? response->token : -1); cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); }); } } // static void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response) { if (context) { auto* cl = reinterpret_cast(context); MqttFailure resp(response); cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); }); } } // static int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message) { OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]() { MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); }); if (context) { auto* cl = reinterpret_cast(context); MqttMessage msg(topicName, *message); cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); }); } return 1; // always return true. Otherwise this callback is triggered again. } // static void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token) { if (context) { auto* cl = reinterpret_cast(context); cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); }); } } // static void ClientPaho::onConnectionLost(void* context, char* cause) { OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]() { if (cause) { MQTTAsync_free(cause); } }); if (context) { auto* cl = reinterpret_cast(context); std::string msg(nullptr == cause ? "cause unknown" : cause); cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); }); } } // static void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message) { (void)message; switch (level) { case MQTTASYNC_TRACE_MAXIMUM: case MQTTASYNC_TRACE_MEDIUM: case MQTTASYNC_TRACE_MINIMUM: { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_PROTOCOL: { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_ERROR: case MQTTASYNC_TRACE_SEVERE: case MQTTASYNC_TRACE_FATAL: { // ("ClientPaho", "paho - %1", message) break; } } }