diff --git a/CMakeLists.txt b/CMakeLists.txt index 5555d8a..f285463 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,6 +8,7 @@ include(projectheader) project_header(osdev_mqtt) add_subdirectory(src) +add_subdirectory(examples/connect) add_subdirectory(examples/pub) add_subdirectory(examples/sub) diff --git a/debug_log.txt b/debug_log.txt new file mode 100644 index 0000000..9c6b103 --- /dev/null +++ b/debug_log.txt @@ -0,0 +1,39 @@ +With broker present + +Jul 02 00:50:19 intelnuc64.osdev.nl test_connection[30797]: MQTT Client started[30797]: [MqttClient::MqttClient] +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::eventHandler][30797]: ConnectionTest - starting event handler. +Jul 02 00:50:19 intelnuc64.osdev.nl MqttClient[30797]: ConnectionTest - Request connect +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - Setting the extra onConnected callback. +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 2 +Jul 02 00:50:19 intelnuc64.osdev.nl ClientPaho[30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - starting callback event handler +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho::onConnectSuccess][30797]: onConnectSucces triggered.. +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: onConnectSuccessOnInstance ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - connected to endpoint localhost:1883 (mqtt version 4, session present FALSE ) +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 4 +Jul 02 00:50:30 intelnuc64.osdev.nl mosquitto[29463]: 1656715830: Client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a closed its connection. + + +Without a broker present + +Jul 02 00:55:33 intelnuc64.osdev.nl test_connection[31574]: MQTT Client started[31574]: [MqttClient::MqttClient] +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::eventHandler][31574]: ConnectionTest - starting event handler. +Jul 02 00:55:33 intelnuc64.osdev.nl MqttClient[31574]: ConnectionTest - Request connect +Jul 02 00:55:33 intelnuc64.osdev.nl [ClientPaho][31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - Setting the extra onConnected callback. +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 2 +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - starting callback event handler +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: onConnectFailureOnInstanceConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - connection failed with code MQTTASYNC_FAILURE (TCP/TLS connect failure) +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 0 + +Jul 02 00:55:42 intelnuc64.osdev.nl systemd[1]: Starting Mosquitto MQTT Broker... +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 starting +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Config loaded from /etc/mosquitto/mosquitto.conf. +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Starting in local only mode. Connections will only be possible from clients running on this machine. +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Create a configuration file which defines a listener to allow remote access. +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: For more details see https://mosquitto.org/documentation/authentication-methods/ +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv4 listen socket on port 1883. +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv6 listen socket on port 1883. +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 running +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New connection from 127.0.0.1:42196 on port 1883. +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New client connected from 127.0.0.1:42196 as ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 (p2, c1, k5). +Jul 02 00:55:53 intelnuc64.osdev.nl mosquitto[31610]: 1656716153: Client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 closed its connection. + + diff --git a/examples/connect/CMakeLists.txt b/examples/connect/CMakeLists.txt new file mode 100644 index 0000000..06c38e8 --- /dev/null +++ b/examples/connect/CMakeLists.txt @@ -0,0 +1,32 @@ +cmake_minimum_required(VERSION 3.0) +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) + +include(projectheader) +project_header(test_connections) + +include_directories( SYSTEM + ${CMAKE_CURRENT_SOURCE_DIR}/../../include +) + +include(compiler) +set(SRC_LIST + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp +) + +add_executable( ${PROJECT_NAME} + ${SRC_LIST} +) + +target_link_libraries( + ${PROJECT_NAME} + mqtt-cpp +) + +set_target_properties( ${PROJECT_NAME} PROPERTIES + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib +) + +include(installation) +install_application() diff --git a/examples/connect/main.cpp b/examples/connect/main.cpp new file mode 100644 index 0000000..d7c51f1 --- /dev/null +++ b/examples/connect/main.cpp @@ -0,0 +1,26 @@ + +#include +#include + +#include "mqttclient.h" + +using namespace osdev::components::mqtt; + +int main( int argc, char* argv[] ) +{ + (void)argc; + (void)argv; + + MqttClient oClient( "ConnectionTest" ); + oClient.connect( "localhost", 1883, Credentials() ); + + unsigned int nCount = 0; + + while(1) + { + std::cout << "[" << std::to_string(nCount++) << "] MQTT Client status : " << oClient.state() << std::endl; + sleep( 1 ); + } + + return 0; +} diff --git a/examples/pub/main.cpp b/examples/pub/main.cpp index 485d059..bc91d51 100644 --- a/examples/pub/main.cpp +++ b/examples/pub/main.cpp @@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) } messageNumber++; } - sleepcp( 5, T_SECONDS ); + sleepcp( 1, T_SECONDS ); } } else diff --git a/examples/sub/main.cpp b/examples/sub/main.cpp index a9c18fb..c9f1850 100644 --- a/examples/sub/main.cpp +++ b/examples/sub/main.cpp @@ -83,7 +83,7 @@ int main( int argc, char* argv[] ) // Start a loop to give the subscriber the possibility to do its work. while( 1 ) { - sleepcp( 1, T_MICRO ); // Sleep 1 Sec to give the scheduler the change to interfene. + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. } } else diff --git a/include/clientpaho.h b/include/clientpaho.h index cf852a6..049c885 100644 --- a/include/clientpaho.h +++ b/include/clientpaho.h @@ -215,7 +215,7 @@ private: * The connection status is set to Connected. * @param response A success response with connection data. */ - void onConnectSuccessOnInstance(const MqttSuccess& response); + void onConnectSuccessOnInstance(); /** * @brief Callback that is called when a connect call fails after being sent to the endpoint. @@ -298,6 +298,7 @@ private: void onConnectionLostOnInstance(const std::string& cause); // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts. + static void onFirstConnect(void* context, char* cause); static void onConnect(void* context, char* cause); static void onConnectSuccess(void* context, MQTTAsync_successData* response); static void onConnectFailure(void* context, MQTTAsync_failureData* response); diff --git a/include/imqttclient.h b/include/imqttclient.h index 452fd17..4540909 100644 --- a/include/imqttclient.h +++ b/include/imqttclient.h @@ -60,13 +60,13 @@ public: * @param port The port to use. * @param credentials The credentials to use. */ - virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0; /** * @brief Connect to the endpoint * @param endpoint an uri endpoint description. */ - virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0; /** * @brief Disconnect the client from the broker diff --git a/include/mqttclient.h b/include/mqttclient.h index 12908f5..391908a 100644 --- a/include/mqttclient.h +++ b/include/mqttclient.h @@ -92,12 +92,12 @@ public: /** * @see IMqttClient */ - virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override; + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; /** * @see IMqttClient */ - virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override; + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; /** * @see IMqttClient diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 47bd9b9..24e6239 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -24,6 +24,7 @@ #include "errorcode.h" #include "mqttutil.h" #include "lockguard.h" +#include "log.h" #include "metaprogrammingdefs.h" #include "mqttstream.h" #include "scopeguard.h" @@ -93,10 +94,10 @@ struct Init std::atomic_int ClientPaho::s_numberOfInstances(0); -ClientPaho::ClientPaho(const std::string& _endpoint, +ClientPaho::ClientPaho( const std::string& _endpoint, const std::string& _id, - const std::function& connectionStatusCallback, - const std::function& deliveryCompleteCallback) + const std::function& connectionStatusCallback, + const std::function& deliveryCompleteCallback ) : m_mutex() , m_endpoint() , m_username() @@ -122,51 +123,55 @@ ClientPaho::ClientPaho(const std::string& _endpoint, , m_callbackEventQueue(m_clientId) , m_workerThread() { - if (0 == s_numberOfInstances++) { - MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho); + if( 0 == s_numberOfInstances++ ) + { + MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho ); } - // MLOGIC_COMMON_DEBUG("ClientPaho", "%1 - ctor ClientPaho %2", m_clientId, this); + + LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) ); parseEndpoint(_endpoint); - auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); - if (MQTTASYNC_SUCCESS == rc) + + auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr ); + if( MQTTASYNC_SUCCESS == rc ) { - MQTTAsync_setCallbacks(m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); - m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); + MQTTAsync_setCallbacks( m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete ); + m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this ); } else { - // Do something sensible here. + LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) ); } } ClientPaho::~ClientPaho() { + LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) ); if( MQTTAsync_isConnected( m_client ) ) { this->unsubscribeAll(); - this->waitForCompletion(std::chrono::milliseconds(2000), std::set{}); - this->disconnect(true, 5000); + this->waitForCompletion( std::chrono::milliseconds(2000), std::set{} ); + this->disconnect( true, 5000 ); } else { // If the status was already disconnected this call does nothing - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); } - if (0 == --s_numberOfInstances) + if( 0 == --s_numberOfInstances ) { // encountered a case where termination of the logging system within paho led to a segfault. // This was a paho thread that was cleaned while at the same time the logging system was terminated. // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less // frequently. - MQTTAsync_setTraceCallback(nullptr); + MQTTAsync_setTraceCallback( nullptr ); } - MQTTAsync_destroy(&m_client); + MQTTAsync_destroy( &m_client ); m_callbackEventQueue.stop(); - if (m_workerThread.joinable()) + if( m_workerThread.joinable() ) { m_workerThread.join(); } @@ -193,14 +198,28 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) setConnectionStatus( ConnectionStatus::ConnectInProgress ); } + LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) ); + MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = 20; + conn_opts.keepAliveInterval = 5; conn_opts.cleansession = 1; - conn_opts.onSuccess = &ClientPaho::onConnectSuccess; + conn_opts.onSuccess = nullptr; conn_opts.onFailure = &ClientPaho::onConnectFailure; conn_opts.context = this; conn_opts.automaticReconnect = 1; + // Make sure we get a signal if the promise is fulfilled + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast(this), ClientPaho::onFirstConnect ); + if( MQTTASYNC_SUCCESS == ccb ) + { + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); + } + else + { + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); + } + + // Setup the last will and testament, if so desired. if( !lwt.topic().empty() ) { MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; @@ -208,13 +227,14 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) will_opts.topicName = lwt.topic().c_str(); conn_opts.will = &will_opts; + + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) ); } else { conn_opts.will = nullptr; } - if( !m_username.empty() ) { conn_opts.username = m_username.c_str(); @@ -259,18 +279,19 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) return -100; } -std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) +std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) { ConnectionStatus currentStatus = m_connectionStatus; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) { + if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus ) + { return -1; } currentStatus = m_connectionStatus; - setConnectionStatus(ConnectionStatus::DisconnectInProgress); + setConnectionStatus( ConnectionStatus::DisconnectInProgress ); } MQTTAsync_disconnectOptions disconn_opts = Init::initialize(); @@ -282,43 +303,45 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) std::promise waitForDisconnectPromise{}; auto waitForDisconnect = waitForDisconnectPromise.get_future(); m_disconnectPromise.reset(); - if (wait) { + if( wait ) + { m_disconnectPromise = std::make_unique>(std::move(waitForDisconnectPromise)); } { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (!m_pendingOperations.insert(-200).second) + if( !m_pendingOperations.insert( -200 ).second ) { - // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) ); } - m_operationResult.erase(-200); + m_operationResult.erase( -200 ); } - int rc = MQTTAsync_disconnect(m_client, &disconn_opts); - if (MQTTASYNC_SUCCESS != rc) { - if (MQTTASYNC_DISCONNECTED == rc) { + int rc = MQTTAsync_disconnect( m_client, &disconn_opts ); + if( MQTTASYNC_SUCCESS != rc ) + { + if( MQTTASYNC_DISCONNECTED == rc ) + { currentStatus = ConnectionStatus::Disconnected; } - setConnectionStatus(currentStatus); - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + setConnectionStatus( currentStatus ); + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); m_operationResult[-200] = false; - m_pendingOperations.erase(-200); + m_pendingOperations.erase( -200 ); - if (MQTTASYNC_DISCONNECTED == rc) + if( MQTTASYNC_DISCONNECTED == rc ) { return -1; } - // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) ); } if( wait ) { if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) { - // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); - + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) ); } waitForDisconnect.get(); m_disconnectPromise.reset(); @@ -326,67 +349,67 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) return -200; } -std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) +std::int32_t ClientPaho::publish( const MqttMessage& message, int qos ) { - if (ConnectionStatus::DisconnectInProgress == m_connectionStatus) + if( ConnectionStatus::DisconnectInProgress == m_connectionStatus ) { - // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); + LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) ); return -1; } - else if (ConnectionStatus::Disconnected == m_connectionStatus) + else if( ConnectionStatus::Disconnected == m_connectionStatus ) { - // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) ); connect( true ); } - if (!isValidTopic(message.topic())) + if( !isValidTopic(message.topic() ) ) { - // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) ); } - if (qos > 2) + if( qos > 2 ) { qos = 2; } - else if (qos < 0) + else if( qos < 0 ) { qos = 0; } - std::unique_lock lck(m_mutex); - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) { + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes ) + { m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) { - // ("ClientPaho", "Adding publish to pending queue."); - m_pendingPublishes.push_front(Publish{ qos, message }); + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus ) + { + LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." ); + m_pendingPublishes.push_front( Publish{ qos, message } ); return -1; } } - return publishInternal(message, qos); + return publishInternal( message, qos ); } void ClientPaho::publishPending() { { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - if (!m_processPendingPublishes) + if( !m_processPendingPublishes ) { return; } } - if (ConnectionStatus::Connected != m_connectionStatus) + if( ConnectionStatus::Connected != m_connectionStatus ) { - // MqttException, "Not connected"); + LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) ) } - while (!m_pendingPublishes.empty()) + while( !m_pendingPublishes.empty() ) { const auto& pub = m_pendingPublishes.back(); - publishInternal(pub.data, pub.qos); - // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what()); + publishInternal( pub.data, pub.qos ); m_pendingPublishes.pop_back(); } @@ -398,23 +421,23 @@ void ClientPaho::publishPending() m_pendingPublishesReadyCV.notify_all(); } -std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function& cb) +std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function& cb ) { - if (ConnectionStatus::Connected != m_connectionStatus) + if( ConnectionStatus::Connected != m_connectionStatus ) { // MqttException, "Not connected" } - if (!isValidTopic(topic)) + if( !isValidTopic( topic ) ) { // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); } - if (qos > 2) + if( qos > 2 ) { qos = 2; } - else if (qos < 0) + else if( qos < 0 ) { qos = 0; } @@ -423,21 +446,27 @@ std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std: OSDEV_COMPONENTS_LOCKGUARD(m_mutex); auto itExisting = m_subscriptions.find(topic); - if (m_subscriptions.end() != itExisting) { - if (itExisting->second.qos == qos) { + if( m_subscriptions.end() != itExisting ) + { + if( itExisting->second.qos == qos ) + { return -1; } // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic); } auto itPending = m_pendingSubscriptions.find(topic); - if (m_pendingSubscriptions.end() != itPending) { - if (itPending->second.qos == qos) { - auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair& item) { return topic == item.second; }); - if (m_subscribeTokenToTopic.end() != itToken) { + if( m_pendingSubscriptions.end() != itPending ) + { + if( itPending->second.qos == qos ) + { + auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair& item) { return topic == item.second; } ); + if( m_subscribeTokenToTopic.end() != itToken ) + { return itToken->first; } - else { + else + { return -1; } } @@ -445,45 +474,45 @@ std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std: } std::string existingTopic{}; - if (isOverlappingInternal(topic, existingTopic)) + if( isOverlappingInternal( topic, existingTopic ) ) { // (OverlappingTopicException, "overlapping topic", existingTopic, topic); } - // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic); - m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb })); + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) ); } - return subscribeInternal(topic, qos); + return subscribeInternal( topic, qos ); } void ClientPaho::resubscribe() { - decltype(m_pendingSubscriptions) pendingSubscriptions{}; + decltype( m_pendingSubscriptions ) pendingSubscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); + std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) ); } - for (const auto& s : pendingSubscriptions) + for( const auto& s : pendingSubscriptions ) { - subscribeInternal(s.first, s.second.qos); + subscribeInternal( s.first, s.second.qos ); } } std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) { { - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); bool found = false; - for (const auto& s : m_subscriptions) + for( const auto& s : m_subscriptions ) { - if (topic == s.first && qos == s.second.qos) + if( topic == s.first && qos == s.second.qos ) { found = true; break; } } - if (!found) + if( !found ) { return -1; } @@ -498,21 +527,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts); - if (MQTTASYNC_SUCCESS != rc) + auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts ); + if( MQTTASYNC_SUCCESS != rc ) { - // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } - if (!m_pendingOperations.insert(opts.token).second) + if( !m_pendingOperations.insert( opts.token ).second ) { - // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) ); } - m_operationResult.erase(opts.token); - if (m_unsubscribeTokenToTopic.count(opts.token) > 0) + m_operationResult.erase( opts.token ); + if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 ) { - // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) ); } m_lastUnsubscribe = opts.token; // centos7 workaround m_unsubscribeTokenToTopic[opts.token] = topic; @@ -526,39 +555,42 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) void ClientPaho::unsubscribeAll() { - decltype(m_subscriptions) subscriptions{}; + decltype( m_subscriptions ) subscriptions{}; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); subscriptions = m_subscriptions; } - for (const auto& s : subscriptions) { - this->unsubscribe(s.first, s.second.qos); + for( const auto& s : subscriptions ) + { + this->unsubscribe( s.first, s.second.qos ); } } std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set& tokens) const { - if (waitFor <= std::chrono::milliseconds(0)) { - return std::chrono::milliseconds(0); + if( waitFor <= std::chrono::milliseconds( 0 ) ) + { + return std::chrono::milliseconds( 0 ); } std::chrono::milliseconds timeElapsed{}; { - osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds) + osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds ) { - timeElapsed = std::chrono::ceil(sinceStart); + timeElapsed = std::chrono::ceil( sinceStart ); }); std::unique_lock lck(m_mutex); + // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() { - if (tokens.empty()) + if( tokens.empty() ) { // wait for all operations to end return m_pendingOperations.empty(); } - else if (tokens.size() == 1) + else if( tokens.size() == 1 ) { - return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end(); + return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end(); } std::vector intersect{}; std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); @@ -568,16 +600,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond return timeElapsed; } -bool ClientPaho::isOverlapping(const std::string& topic) const +bool ClientPaho::isOverlapping( const std::string& topic ) const { std::string existingTopic{}; - return isOverlapping(topic, existingTopic); + return isOverlapping( topic, existingTopic ); } -bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const +bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - return isOverlappingInternal(topic, existingTopic); + return isOverlappingInternal( topic, existingTopic ); } std::vector ClientPaho::pendingOperations() const @@ -585,7 +617,7 @@ std::vector ClientPaho::pendingOperations() const OSDEV_COMPONENTS_LOCKGUARD(m_mutex); std::vector retval{}; retval.resize(m_pendingOperations.size()); - std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin()); + std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() ); return retval; } @@ -595,36 +627,36 @@ bool ClientPaho::hasPendingSubscriptions() const return !m_pendingSubscriptions.empty(); } -boost::optional ClientPaho::operationResult(std::int32_t token) const +boost::optional ClientPaho::operationResult( std::int32_t token ) const { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); boost::optional ret{}; - auto cit = m_operationResult.find(token); - if (m_operationResult.end() != cit) + auto cit = m_operationResult.find( token ); + if( m_operationResult.end() != cit ) { ret = cit->second; } return ret; } -void ClientPaho::parseEndpoint(const std::string& _endpoint) +void ClientPaho::parseEndpoint( const std::string& _endpoint ) { - auto ep = UriParser::parse(_endpoint); - if (ep.find("user") != ep.end()) + auto ep = UriParser::parse( _endpoint ); + if( ep.find( "user" ) != ep.end() ) { m_username = ep["user"]; ep["user"].clear(); } - if (ep.find("password") != ep.end()) + if( ep.find( "password" ) != ep.end() ) { m_password = ep["password"]; ep["password"].clear(); } - m_endpoint = UriParser::toString(ep); + m_endpoint = UriParser::toString( ep ); } -std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) +std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos ) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onPublishSuccess; @@ -637,21 +669,21 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) // the insertion of the token into the pending operations. // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts); - if (MQTTASYNC_SUCCESS != rc) + auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts ); + if( MQTTASYNC_SUCCESS != rc ) { - // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc)); + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } - if (!m_pendingOperations.insert(opts.token).second) + if( !m_pendingOperations.insert( opts.token ).second ) { - // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token); + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); } - m_operationResult.erase(opts.token); + m_operationResult.erase( opts.token ); return opts.token; } -std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) +std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos ) { MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.onSuccess = &ClientPaho::onSubscribeSuccess; @@ -661,52 +693,51 @@ std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) // Need to lock the mutex because it is possible that the callback is faster than // the insertion of the token into the pending operations. OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts); + auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts ); if (MQTTASYNC_SUCCESS != rc) { - m_pendingSubscriptions.erase(topic); - // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); - // (MqttException, "Subscription failed"); + m_pendingSubscriptions.erase( topic ); + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); } - if (!m_pendingOperations.insert(opts.token).second) + if( !m_pendingOperations.insert( opts.token ).second ) { - // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token); + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) ); } - m_operationResult.erase(opts.token); - if (m_subscribeTokenToTopic.count(opts.token) > 0) + m_operationResult.erase( opts.token ); + if( m_subscribeTokenToTopic.count( opts.token ) > 0 ) { - // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic); + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) ); } m_subscribeTokenToTopic[opts.token] = topic; return opts.token; } -void ClientPaho::setConnectionStatus(ConnectionStatus status) +void ClientPaho::setConnectionStatus( ConnectionStatus status ) { ConnectionStatus curStatus = m_connectionStatus; m_connectionStatus = status; - if (status != curStatus && m_connectionStatusCallback) + if( status != curStatus && m_connectionStatusCallback ) { - m_connectionStatusCallback(m_clientId, status); + m_connectionStatusCallback( m_clientId, status ); } } -bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const +bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const { existingTopic.clear(); - for (const auto& s : m_pendingSubscriptions) + for( const auto& s : m_pendingSubscriptions ) { - if (testForOverlap(s.first, topic)) + if( testForOverlap( s.first, topic ) ) { existingTopic = s.first; return true; } } - for (const auto& s : m_subscriptions) + for( const auto& s : m_subscriptions ) { - if (testForOverlap(s.first, topic)) + if( testForOverlap( s.first, topic ) ) { existingTopic = s.first; return true; @@ -715,87 +746,86 @@ bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& ex return false; } -void ClientPaho::pushIncomingEvent(std::function ev) +void ClientPaho::pushIncomingEvent( std::function ev ) { - m_callbackEventQueue.push(ev); + m_callbackEventQueue.push( ev ); } void ClientPaho::callbackEventHandler() { - // ("ClientPaho", "%1 - starting callback event handler", m_clientId); - for (;;) { + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) ); + for( ;; ) + { std::vector> events; - if (!m_callbackEventQueue.pop(events)) + if( !m_callbackEventQueue.pop(events) ) { break; } - for (const auto& ev : events) + for( const auto& ev : events ) { ev(); - // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException); } } - // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) ); } - -void ClientPaho::onConnectOnInstance(const std::string& cause) +void ClientPaho::onConnectOnInstance( const std::string& cause ) { - (void)cause; - // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause); + (void) cause; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); + std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) ); m_subscriptions.clear(); m_processPendingPublishes = true; // all publishes are on hold until publishPending is called. } - setConnectionStatus(ConnectionStatus::Connected); + setConnectionStatus( ConnectionStatus::Connected ); } -void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) +void ClientPaho::onConnectSuccessOnInstance() { - auto connectData = response.connectionData(); - // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)", - // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent()); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Register the connect callback that is used in reconnect scenarios. - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); - if (MQTTASYNC_SUCCESS != rc) + auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect ); + if( MQTTASYNC_SUCCESS != rc ) { - // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); + LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); } + // For MQTTV5 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); //if (MQTTASYNC_SUCCESS != rc) { // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); //} // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); + m_operationResult[-100] = true; m_pendingOperations.erase(-100); } - setConnectionStatus(ConnectionStatus::Connected); - if (m_connectPromise) + + setConnectionStatus( ConnectionStatus::Connected ); + if( m_connectPromise ) { + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) ); m_connectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } -void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) +void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response ) { - (void)response; - // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); + (void) response; + LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")")); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); m_operationResult[-100] = false; m_pendingOperations.erase(-100); } - if (ConnectionStatus::ConnectInProgress == m_connectionStatus) + if( ConnectionStatus::ConnectInProgress == m_connectionStatus ) { - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); } m_operationsCompleteCV.notify_all(); } @@ -805,9 +835,9 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); //} -void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) +void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& ) { - // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint); + LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); m_subscriptions.clear(); @@ -820,45 +850,46 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) m_pendingOperations.clear(); } - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); - if (m_disconnectPromise) { + if( m_disconnectPromise ) + { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } -void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response) +void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) { - (void)response; - // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); + (void) response; + LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) ); { - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); m_operationResult[-200] = false; m_pendingOperations.erase(-200); } - if (MQTTAsync_isConnected(m_client)) + if( MQTTAsync_isConnected( m_client ) ) { - setConnectionStatus(ConnectionStatus::Connected); + setConnectionStatus( ConnectionStatus::Connected ); } else { - setConnectionStatus(ConnectionStatus::Disconnected); + setConnectionStatus( ConnectionStatus::Disconnected ); } - if (m_disconnectPromise) + if( m_disconnectPromise ) { m_disconnectPromise->set_value(); } m_operationsCompleteCV.notify_all(); } -void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) +void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) { auto pd = response.publishData(); - // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); + LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); @@ -868,9 +899,9 @@ void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) m_operationsCompleteCV.notify_all(); } -void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) +void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) { - // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); + LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); @@ -880,78 +911,82 @@ void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) m_operationsCompleteCV.notify_all(); } -void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response) +void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response ) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token()); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) ); + + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); bool operationOk = false; - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]() { // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = operationOk; - m_pendingOperations.erase(response.token()); + m_pendingOperations.erase( response.token() ); }); - auto it = m_subscribeTokenToTopic.find(response.token()); - if (m_subscribeTokenToTopic.end() == it) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); + auto it = m_subscribeTokenToTopic.find( response.token() ); + if( m_subscribeTokenToTopic.end() == it ) + { + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; m_subscribeTokenToTopic.erase(it); - auto pendingIt = m_pendingSubscriptions.find(topic); - if (m_pendingSubscriptions.end() == pendingIt) + auto pendingIt = m_pendingSubscriptions.find( topic ); + if( m_pendingSubscriptions.end() == pendingIt ) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); return; } - if (response.qos() != pendingIt->second.qos) + if( response.qos() != pendingIt->second.qos ) { - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos()); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) ); } - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic); - m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second))); - m_pendingSubscriptions.erase(pendingIt); + + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) ); + m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) ); + m_pendingSubscriptions.erase( pendingIt ); operationOk = true; } -void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response) +void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response ) { - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); + + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() { // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; - m_pendingOperations.erase(response.token()); + m_pendingOperations.erase( response.token() ); }); - auto it = m_subscribeTokenToTopic.find(response.token()); - if (m_subscribeTokenToTopic.end() == it) + auto it = m_subscribeTokenToTopic.find( response.token() ); + if( m_subscribeTokenToTopic.end() == it ) { - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; - m_subscribeTokenToTopic.erase(it); + m_subscribeTokenToTopic.erase( it ); - auto pendingIt = m_pendingSubscriptions.find(topic); - if (m_pendingSubscriptions.end() == pendingIt) + auto pendingIt = m_pendingSubscriptions.find( topic ); + if( m_pendingSubscriptions.end() == pendingIt ) { - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); return; } - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic); - m_pendingSubscriptions.erase(pendingIt); + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) ); + m_pendingSubscriptions.erase( pendingIt ); } -void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) +void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response ) { - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token()); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) ); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. @@ -960,116 +995,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled // sequentially (see ClientPaho::unsubscribe)! auto token = response.token(); - if (-1 == token) + if( -1 == token ) { token = m_lastUnsubscribe; m_lastUnsubscribe = -1; } bool operationOk = false; - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]() { // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); m_operationResult[token] = operationOk; - m_pendingOperations.erase(token); + m_pendingOperations.erase( token ); }); - auto it = m_unsubscribeTokenToTopic.find(token); - if (m_unsubscribeTokenToTopic.end() == it) + auto it = m_unsubscribeTokenToTopic.find( token ); + if( m_unsubscribeTokenToTopic.end() == it ) { - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token); + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) ); return; } auto topic = it->second; - m_unsubscribeTokenToTopic.erase(it); + m_unsubscribeTokenToTopic.erase( it ); - auto registeredIt = m_subscriptions.find(topic); - if (m_subscriptions.end() == registeredIt) { - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token()); + auto registeredIt = m_subscriptions.find( topic ); + if( m_subscriptions.end() == registeredIt ) + { + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) ); return; } - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic); - m_subscriptions.erase(registeredIt); + + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) ); + m_subscriptions.erase( registeredIt ); operationOk = true; } -void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response) +void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response ) { - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() { // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); m_operationResult[response.token()] = false; - m_pendingOperations.erase(response.token()); + m_pendingOperations.erase( response.token() ); }); - auto it = m_unsubscribeTokenToTopic.find(response.token()); - if (m_unsubscribeTokenToTopic.end() == it) + auto it = m_unsubscribeTokenToTopic.find( response.token() ); + if( m_unsubscribeTokenToTopic.end() == it ) { - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); return; } auto topic = it->second; - m_unsubscribeTokenToTopic.erase(it); + m_unsubscribeTokenToTopic.erase( it ); } -int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message) +int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message ) { - // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate()); - - std::function cb; + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) ); + std::function cb; { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); - for (const auto& s : m_subscriptions) + for( const auto& s : m_subscriptions ) { - if (boost::regex_match(message.topic(), s.second.topicRegex)) + if( boost::regex_match( message.topic(), s.second.topicRegex ) ) { cb = s.second.callback; } } } - if (cb) + if( cb ) { - cb(message); + cb( message ); } else { - // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic()); + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) ); } return 1; } -void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token) +void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token ) { - // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token); - if (m_deliveryCompleteCallback) + LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) ); + if( m_deliveryCompleteCallback ) { - m_deliveryCompleteCallback(m_clientId, static_cast(token)); + m_deliveryCompleteCallback( m_clientId, static_cast( token ) ); } } -void ClientPaho::onConnectionLostOnInstance(const std::string& cause) +void ClientPaho::onConnectionLostOnInstance( const std::string& cause ) { - (void)cause; + (void) cause; // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); - setConnectionStatus(ConnectionStatus::ReconnectInProgress); + setConnectionStatus( ConnectionStatus::ReconnectInProgress ); OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Remove all tokens related to subscriptions from the active operations. - for (const auto& p : m_subscribeTokenToTopic) + for( const auto& p : m_subscribeTokenToTopic ) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); - m_pendingOperations.erase(p.first); + m_pendingOperations.erase( p.first ); } - for (const auto& p : m_unsubscribeTokenToTopic) + for( const auto& p : m_unsubscribeTokenToTopic ) { // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); - m_pendingOperations.erase(p.first); + m_pendingOperations.erase( p.first ); } // Clear the administration used in the subscribe process. m_subscribeTokenToTopic.clear(); @@ -1077,39 +1113,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string& cause) } // static -void ClientPaho::onConnect(void* context, char* cause) +void ClientPaho::onFirstConnect( void* context, char* cause ) { - if (context) + LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." ); + if( context ) { - auto* cl = reinterpret_cast(context); - std::string reason(nullptr == cause ? "unknown cause" : cause); - cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); }); + auto *cl = reinterpret_cast( context ); + std::string reason( nullptr == cause ? "Unknown cause" : cause ); + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } ); + } +} + +void ClientPaho::onConnect( void* context, char* cause ) +{ + LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." ); + if( context ) + { + auto* cl = reinterpret_cast( context ); + std::string reason( nullptr == cause ? "unknown cause" : cause ); + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } ); } } // static -void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." ); + if( context ) { - auto* cl = reinterpret_cast(context); - if (!response) { + auto* cl = reinterpret_cast( context ); + if( !response ) + { // connect should always have a valid response struct. - // ("ClientPaho", "onConnectSuccess - no response data"); + LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" ); + return; } - MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); - cl->pushIncomingEvent([cl, resp]() { cl->onConnectSuccessOnInstance(resp); }); + // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); + cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } ); } } // static -void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response ) { - if (context) + LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) ); + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } ); } } @@ -1134,34 +1186,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response //} // static -void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttSuccess resp(response ? response->token : 0); - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttSuccess resp( response ? response->token : 0 ); + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } ); } } // static -void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response ) { - if (context) + LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." ); + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } ); } } // static -void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - if (!response) + auto* cl = reinterpret_cast( context ); + if( !response ) { // publish should always have a valid response struct. // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); @@ -1172,134 +1225,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response } // static -void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response ) { - (void)response; - if (context) + (void) response; + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } ); } } // static -void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - if (!response) + auto* cl = reinterpret_cast( context ); + if( !response ) { // subscribe should always have a valid response struct. // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); } - MqttSuccess resp(response->token, response->alt.qos); - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); }); + MqttSuccess resp( response->token, response->alt.qos ); + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } ); } } // static -void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } ); } } // static -void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response) +void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttSuccess resp(response ? response->token : -1); - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttSuccess resp( response ? response->token : -1 ); + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } ); } } // static -void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response) +void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttFailure resp(response); - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); }); + auto* cl = reinterpret_cast( context ); + MqttFailure resp( response ); + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } ); } } // static -int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message) +int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message ) { - OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]() + OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]() { - MQTTAsync_freeMessage(&message); - MQTTAsync_free(topicName); + MQTTAsync_freeMessage( &message ); + MQTTAsync_free( topicName ); }); - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - MqttMessage msg(topicName, *message); - cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); }); + auto* cl = reinterpret_cast( context ); + MqttMessage msg( topicName, *message ); + cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } ); } return 1; // always return true. Otherwise this callback is triggered again. } // static -void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token) +void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token ) { - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); }); + auto* cl = reinterpret_cast( context ); + cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } ); } } // static -void ClientPaho::onConnectionLost(void* context, char* cause) +void ClientPaho::onConnectionLost( void* context, char* cause ) { - OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]() + OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]() { - if (cause) + if( cause ) { - MQTTAsync_free(cause); + MQTTAsync_free( cause ); } }); - if (context) + if( context ) { - auto* cl = reinterpret_cast(context); - std::string msg(nullptr == cause ? "cause unknown" : cause); - cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); }); + auto* cl = reinterpret_cast( context ); + std::string msg( nullptr == cause ? "cause unknown" : cause ); + cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } ); } } // static -void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message) +void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message ) { - (void)message; - switch (level) + (void) message; + switch( level ) { case MQTTASYNC_TRACE_MAXIMUM: case MQTTASYNC_TRACE_MEDIUM: - case MQTTASYNC_TRACE_MINIMUM: { + case MQTTASYNC_TRACE_MINIMUM: + { // ("ClientPaho", "paho - %1", message) break; } - case MQTTASYNC_TRACE_PROTOCOL: { + case MQTTASYNC_TRACE_PROTOCOL: + { // ("ClientPaho", "paho - %1", message) break; } case MQTTASYNC_TRACE_ERROR: case MQTTASYNC_TRACE_SEVERE: - case MQTTASYNC_TRACE_FATAL: { + case MQTTASYNC_TRACE_FATAL: + { // ("ClientPaho", "paho - %1", message) break; } diff --git a/src/connectionstatus.cpp b/src/connectionstatus.cpp index 87fb75b..8e70f9e 100644 --- a/src/connectionstatus.cpp +++ b/src/connectionstatus.cpp @@ -38,5 +38,5 @@ std::ostream& operator<<(std::ostream &os, ConnectionStatus rhs) case ConnectionStatus::Connected: return os << "Connected"; } - return os << "Unknown"; // Should never be reached. + return os << "Unknown"; // "Should" never be reached. ( But guess what? ) } diff --git a/src/log.cpp b/src/log.cpp index a97f151..7feed80 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -53,7 +53,7 @@ int toInt( LogLevel level ) std::string Log::s_context = std::string(); std::string Log::s_fileName = std::string(); -LogLevel Log::s_logLevel = LogLevel::Error; +LogLevel Log::s_logLevel = LogLevel::Debug; LogMask Log::s_logMask = LogMask::None; void Log::init( const std::string& context, const std::string& logFile, LogLevel logDepth ) diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index ba65159..ad478fb 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -125,7 +125,7 @@ StateEnum MqttClient::state() const return m_serverState.state(); } -void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt ) +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt, bool blocking ) { osdev::components::mqtt::ParsedUri _endpoint = { { "scheme", "tcp" }, @@ -135,10 +135,10 @@ void MqttClient::connect(const std::string& host, int port, const Credentials &c { "port", std::to_string(port) } }; - this->connect( UriParser::toString( _endpoint ), lwt ); + this->connect( UriParser::toString( _endpoint ), lwt, blocking ); } -void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt, bool blocking ) { LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); @@ -171,7 +171,7 @@ void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) client = m_principalClient.get(); } - client->connect( true, lwt ); + client->connect( blocking, lwt ); } void MqttClient::disconnect() @@ -254,12 +254,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi // throw (?)(MqttException, "Not connected"); return Token(m_clientId, -1); } - if (!m_principalClient->isOverlapping(topic)) { + if (!m_principalClient->isOverlapping(topic)) + { client = m_principalClient.get(); clientFound = true; } - else { - for (const auto& c : m_additionalClients) { + else + { + for (const auto& c : m_additionalClients) + { if (!c->isOverlapping(topic)) { client = c.get(); clientFound = true; @@ -267,7 +270,8 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi } } } - if (!clientFound) { + if (!clientFound) + { LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1. m_additionalClients.emplace_back(std::make_unique( @@ -278,7 +282,8 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi client = m_additionalClients.back().get(); } } - if (!clientFound) { + if (!clientFound) + { client->connect(true); } return Token{ client->clientId(), client->subscribe(topic, qos, cb) };