From 9421324bbad2b582bdb55f44f8cae91453189b69 Mon Sep 17 00:00:00 2001 From: Peter M. Groen Date: Thu, 30 Jun 2022 21:19:33 +0200 Subject: [PATCH] First fix on connection --- CMakeLists.txt | 1 + examples/connect/CMakeLists.txt | 32 ++++++++++++++++++++++++++++++++ examples/connect/main.cpp | 26 ++++++++++++++++++++++++++ examples/pub/main.cpp | 2 +- examples/sub/main.cpp | 2 +- include/clientpaho.h | 7 +++++++ include/imqttclient.h | 4 ++-- include/mqttclient.h | 4 ++-- src/clientpaho.cpp | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++------ src/connectionstatus.cpp | 2 +- src/log.cpp | 2 +- src/mqttclient.cpp | 23 ++++++++++++++--------- 12 files changed, 139 insertions(+), 23 deletions(-) create mode 100644 examples/connect/CMakeLists.txt create mode 100644 examples/connect/main.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5555d8a..f285463 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,6 +8,7 @@ include(projectheader) project_header(osdev_mqtt) add_subdirectory(src) +add_subdirectory(examples/connect) add_subdirectory(examples/pub) add_subdirectory(examples/sub) diff --git a/examples/connect/CMakeLists.txt b/examples/connect/CMakeLists.txt new file mode 100644 index 0000000..06c38e8 --- /dev/null +++ b/examples/connect/CMakeLists.txt @@ -0,0 +1,32 @@ +cmake_minimum_required(VERSION 3.0) +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) + +include(projectheader) +project_header(test_connections) + +include_directories( SYSTEM + ${CMAKE_CURRENT_SOURCE_DIR}/../../include +) + +include(compiler) +set(SRC_LIST + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp +) + +add_executable( ${PROJECT_NAME} + ${SRC_LIST} +) + +target_link_libraries( + ${PROJECT_NAME} + mqtt-cpp +) + +set_target_properties( ${PROJECT_NAME} PROPERTIES + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib +) + +include(installation) +install_application() diff --git a/examples/connect/main.cpp b/examples/connect/main.cpp new file mode 100644 index 0000000..d7c51f1 --- /dev/null +++ b/examples/connect/main.cpp @@ -0,0 +1,26 @@ + +#include +#include + +#include "mqttclient.h" + +using namespace osdev::components::mqtt; + +int main( int argc, char* argv[] ) +{ + (void)argc; + (void)argv; + + MqttClient oClient( "ConnectionTest" ); + oClient.connect( "localhost", 1883, Credentials() ); + + unsigned int nCount = 0; + + while(1) + { + std::cout << "[" << std::to_string(nCount++) << "] MQTT Client status : " << oClient.state() << std::endl; + sleep( 1 ); + } + + return 0; +} diff --git a/examples/pub/main.cpp b/examples/pub/main.cpp index 485d059..bc91d51 100644 --- a/examples/pub/main.cpp +++ b/examples/pub/main.cpp @@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) } messageNumber++; } - sleepcp( 5, T_SECONDS ); + sleepcp( 1, T_SECONDS ); } } else diff --git a/examples/sub/main.cpp b/examples/sub/main.cpp index a9c18fb..c9f1850 100644 --- a/examples/sub/main.cpp +++ b/examples/sub/main.cpp @@ -83,7 +83,7 @@ int main( int argc, char* argv[] ) // Start a loop to give the subscriber the possibility to do its work. while( 1 ) { - sleepcp( 1, T_MICRO ); // Sleep 1 Sec to give the scheduler the change to interfene. + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. } } else diff --git a/include/clientpaho.h b/include/clientpaho.h index cf852a6..2f4fff2 100644 --- a/include/clientpaho.h +++ b/include/clientpaho.h @@ -204,6 +204,12 @@ private: void callbackEventHandler(); /** + * @brief Callback method that is called when a first connect succeeds. + * @param reason Som extra information if there is any. + */ + void onFirstConnectInstance(const std::string &reason); + + /** * @brief Callback method that is called when a reconnect succeeds. * @param cause The cause of the original disconnect. */ @@ -298,6 +304,7 @@ private: void onConnectionLostOnInstance(const std::string& cause); // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts. + static void onFirstConnect(void* context, char* cause); static void onConnect(void* context, char* cause); static void onConnectSuccess(void* context, MQTTAsync_successData* response); static void onConnectFailure(void* context, MQTTAsync_failureData* response); diff --git a/include/imqttclient.h b/include/imqttclient.h index 452fd17..4540909 100644 --- a/include/imqttclient.h +++ b/include/imqttclient.h @@ -60,13 +60,13 @@ public: * @param port The port to use. * @param credentials The credentials to use. */ - virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0; /** * @brief Connect to the endpoint * @param endpoint an uri endpoint description. */ - virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0; /** * @brief Disconnect the client from the broker diff --git a/include/mqttclient.h b/include/mqttclient.h index 12908f5..391908a 100644 --- a/include/mqttclient.h +++ b/include/mqttclient.h @@ -92,12 +92,12 @@ public: /** * @see IMqttClient */ - virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override; + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; /** * @see IMqttClient */ - virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override; + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; /** * @see IMqttClient diff --git a/src/clientpaho.cpp b/src/clientpaho.cpp index 47bd9b9..a52453f 100644 --- a/src/clientpaho.cpp +++ b/src/clientpaho.cpp @@ -24,6 +24,7 @@ #include "errorcode.h" #include "mqttutil.h" #include "lockguard.h" +#include "log.h" #include "metaprogrammingdefs.h" #include "mqttstream.h" #include "scopeguard.h" @@ -131,6 +132,18 @@ ClientPaho::ClientPaho(const std::string& _endpoint, if (MQTTASYNC_SUCCESS == rc) { MQTTAsync_setCallbacks(m_client, reinterpret_cast(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); + LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback.") ); + /* + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast(this), ClientPaho::onConnect ); + if( MQTTASYNC_SUCCESS == ccb ) + { + LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); + } + else + { + LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); + } + */ m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); } else @@ -194,7 +207,7 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) } MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = 20; + conn_opts.keepAliveInterval = 5; conn_opts.cleansession = 1; conn_opts.onSuccess = &ClientPaho::onConnectSuccess; conn_opts.onFailure = &ClientPaho::onConnectFailure; @@ -739,6 +752,22 @@ void ClientPaho::callbackEventHandler() // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); } +void ClientPaho::onFirstConnectInstance(const std::string &reason) +{ + (void)reason; + { + OSDEV_COMPONENTS_LOCKGUARD(m_mutex); + // Register the connect callback that is used in reconnect scenarios. + auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); + if (MQTTASYNC_SUCCESS != rc) + { + LogError( "[ClientPaho]", std::string( "onFirstConnectInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); + } + } + + setConnectionStatus(ConnectionStatus::Connected); +} + void ClientPaho::onConnectOnInstance(const std::string& cause) { (void)cause; @@ -756,15 +785,15 @@ void ClientPaho::onConnectOnInstance(const std::string& cause) void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) { auto connectData = response.connectionData(); - // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)", - // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent()); + LogDebug( "[ClientPaho]", std::string( "onConnectSuccessOnInstance " + m_clientId + " - connected to endpoint " + connectData.serverUri() + + " (mqtt version " + std::to_string( connectData.mqttVersion() ) + ", session present " + ( connectData.sessionPresent() ? "TRUE" : "FALSE" ) + " )" ) ); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // Register the connect callback that is used in reconnect scenarios. auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); if (MQTTASYNC_SUCCESS != rc) { - // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); + LogError( "[ClientPaho]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); } // For MQTTV5 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); @@ -775,9 +804,11 @@ void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) m_operationResult[-100] = true; m_pendingOperations.erase(-100); } + setConnectionStatus(ConnectionStatus::Connected); - if (m_connectPromise) + if(m_connectPromise) { + LogDebug( "[ClientPaho]", std::string("connectPromise still present. Resetting!") ); m_connectPromise->set_value(); } m_operationsCompleteCV.notify_all(); @@ -786,7 +817,7 @@ void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) { (void)response; - // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); + LogDebug("ClientPaho", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")")); { OSDEV_COMPONENTS_LOCKGUARD(m_mutex); // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); @@ -1077,8 +1108,20 @@ void ClientPaho::onConnectionLostOnInstance(const std::string& cause) } // static +void ClientPaho::onFirstConnect(void* context, char* cause) +{ + LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." ); + if(context) + { + auto *cl = reinterpret_cast(context); + std::string reason(nullptr == cause ? "Unknown cause" : cause); + cl->pushIncomingEvent([cl, reason]() { cl->onFirstConnectInstance(reason); }); + } +} + void ClientPaho::onConnect(void* context, char* cause) { + LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." ); if (context) { auto* cl = reinterpret_cast(context); @@ -1090,6 +1133,7 @@ void ClientPaho::onConnect(void* context, char* cause) // static void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) { + LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSucces triggered.." ); if (context) { auto* cl = reinterpret_cast(context); @@ -1147,6 +1191,7 @@ void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* respo // static void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) { + LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." ); if (context) { auto* cl = reinterpret_cast(context); diff --git a/src/connectionstatus.cpp b/src/connectionstatus.cpp index 87fb75b..8e70f9e 100644 --- a/src/connectionstatus.cpp +++ b/src/connectionstatus.cpp @@ -38,5 +38,5 @@ std::ostream& operator<<(std::ostream &os, ConnectionStatus rhs) case ConnectionStatus::Connected: return os << "Connected"; } - return os << "Unknown"; // Should never be reached. + return os << "Unknown"; // "Should" never be reached. ( But guess what? ) } diff --git a/src/log.cpp b/src/log.cpp index a97f151..7feed80 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -53,7 +53,7 @@ int toInt( LogLevel level ) std::string Log::s_context = std::string(); std::string Log::s_fileName = std::string(); -LogLevel Log::s_logLevel = LogLevel::Error; +LogLevel Log::s_logLevel = LogLevel::Debug; LogMask Log::s_logMask = LogMask::None; void Log::init( const std::string& context, const std::string& logFile, LogLevel logDepth ) diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index ba65159..ad478fb 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -125,7 +125,7 @@ StateEnum MqttClient::state() const return m_serverState.state(); } -void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt ) +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt, bool blocking ) { osdev::components::mqtt::ParsedUri _endpoint = { { "scheme", "tcp" }, @@ -135,10 +135,10 @@ void MqttClient::connect(const std::string& host, int port, const Credentials &c { "port", std::to_string(port) } }; - this->connect( UriParser::toString( _endpoint ), lwt ); + this->connect( UriParser::toString( _endpoint ), lwt, blocking ); } -void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt, bool blocking ) { LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); @@ -171,7 +171,7 @@ void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) client = m_principalClient.get(); } - client->connect( true, lwt ); + client->connect( blocking, lwt ); } void MqttClient::disconnect() @@ -254,12 +254,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi // throw (?)(MqttException, "Not connected"); return Token(m_clientId, -1); } - if (!m_principalClient->isOverlapping(topic)) { + if (!m_principalClient->isOverlapping(topic)) + { client = m_principalClient.get(); clientFound = true; } - else { - for (const auto& c : m_additionalClients) { + else + { + for (const auto& c : m_additionalClients) + { if (!c->isOverlapping(topic)) { client = c.get(); clientFound = true; @@ -267,7 +270,8 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi } } } - if (!clientFound) { + if (!clientFound) + { LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1. m_additionalClients.emplace_back(std::make_unique( @@ -278,7 +282,8 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi client = m_additionalClients.back().get(); } } - if (!clientFound) { + if (!clientFound) + { client->connect(true); } return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; -- libgit2 0.21.4