Commit 9421324bbad2b582bdb55f44f8cae91453189b69

Authored by Peter M. Groen
1 parent 750b542a

First fix on connection

CMakeLists.txt
... ... @@ -8,6 +8,7 @@ include(projectheader)
8 8 project_header(osdev_mqtt)
9 9  
10 10 add_subdirectory(src)
  11 +add_subdirectory(examples/connect)
11 12 add_subdirectory(examples/pub)
12 13 add_subdirectory(examples/sub)
13 14  
... ...
examples/connect/CMakeLists.txt 0 → 100644
  1 +cmake_minimum_required(VERSION 3.0)
  2 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
  3 +
  4 +include(projectheader)
  5 +project_header(test_connections)
  6 +
  7 +include_directories( SYSTEM
  8 + ${CMAKE_CURRENT_SOURCE_DIR}/../../include
  9 +)
  10 +
  11 +include(compiler)
  12 +set(SRC_LIST
  13 + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
  14 +)
  15 +
  16 +add_executable( ${PROJECT_NAME}
  17 + ${SRC_LIST}
  18 +)
  19 +
  20 +target_link_libraries(
  21 + ${PROJECT_NAME}
  22 + mqtt-cpp
  23 +)
  24 +
  25 +set_target_properties( ${PROJECT_NAME} PROPERTIES
  26 + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
  27 + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  28 + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  29 +)
  30 +
  31 +include(installation)
  32 +install_application()
... ...
examples/connect/main.cpp 0 → 100644
  1 +
  2 +#include <iostream>
  3 +#include <unistd.h>
  4 +
  5 +#include "mqttclient.h"
  6 +
  7 +using namespace osdev::components::mqtt;
  8 +
  9 +int main( int argc, char* argv[] )
  10 +{
  11 + (void)argc;
  12 + (void)argv;
  13 +
  14 + MqttClient oClient( "ConnectionTest" );
  15 + oClient.connect( "localhost", 1883, Credentials() );
  16 +
  17 + unsigned int nCount = 0;
  18 +
  19 + while(1)
  20 + {
  21 + std::cout << "[" << std::to_string(nCount++) << "] MQTT Client status : " << oClient.state() << std::endl;
  22 + sleep( 1 );
  23 + }
  24 +
  25 + return 0;
  26 +}
... ...
examples/pub/main.cpp
... ... @@ -94,7 +94,7 @@ int main( int argc, char* argv[] )
94 94 }
95 95 messageNumber++;
96 96 }
97   - sleepcp( 5, T_SECONDS );
  97 + sleepcp( 1, T_SECONDS );
98 98 }
99 99 }
100 100 else
... ...
examples/sub/main.cpp
... ... @@ -83,7 +83,7 @@ int main( int argc, char* argv[] )
83 83 // Start a loop to give the subscriber the possibility to do its work.
84 84 while( 1 )
85 85 {
86   - sleepcp( 1, T_MICRO ); // Sleep 1 Sec to give the scheduler the change to interfene.
  86 + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
87 87 }
88 88 }
89 89 else
... ...
include/clientpaho.h
... ... @@ -204,6 +204,12 @@ private:
204 204 void callbackEventHandler();
205 205  
206 206 /**
  207 + * @brief Callback method that is called when a first connect succeeds.
  208 + * @param reason Som extra information if there is any.
  209 + */
  210 + void onFirstConnectInstance(const std::string &reason);
  211 +
  212 + /**
207 213 * @brief Callback method that is called when a reconnect succeeds.
208 214 * @param cause The cause of the original disconnect.
209 215 */
... ... @@ -298,6 +304,7 @@ private:
298 304 void onConnectionLostOnInstance(const std::string& cause);
299 305  
300 306 // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts.
  307 + static void onFirstConnect(void* context, char* cause);
301 308 static void onConnect(void* context, char* cause);
302 309 static void onConnectSuccess(void* context, MQTTAsync_successData* response);
303 310 static void onConnectFailure(void* context, MQTTAsync_failureData* response);
... ...
include/imqttclient.h
... ... @@ -60,13 +60,13 @@ public:
60 60 * @param port The port to use.
61 61 * @param credentials The credentials to use.
62 62 */
63   - virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0;
  63 + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0;
64 64  
65 65 /**
66 66 * @brief Connect to the endpoint
67 67 * @param endpoint an uri endpoint description.
68 68 */
69   - virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0;
  69 + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0;
70 70  
71 71 /**
72 72 * @brief Disconnect the client from the broker
... ...
include/mqttclient.h
... ... @@ -92,12 +92,12 @@ public:
92 92 /**
93 93 * @see IMqttClient
94 94 */
95   - virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override;
  95 + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;
96 96  
97 97 /**
98 98 * @see IMqttClient
99 99 */
100   - virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override;
  100 + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;
101 101  
102 102 /**
103 103 * @see IMqttClient
... ...
src/clientpaho.cpp
... ... @@ -24,6 +24,7 @@
24 24 #include "errorcode.h"
25 25 #include "mqttutil.h"
26 26 #include "lockguard.h"
  27 +#include "log.h"
27 28 #include "metaprogrammingdefs.h"
28 29 #include "mqttstream.h"
29 30 #include "scopeguard.h"
... ... @@ -131,6 +132,18 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint,
131 132 if (MQTTASYNC_SUCCESS == rc)
132 133 {
133 134 MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete);
  135 + LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback.") );
  136 + /*
  137 + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnect );
  138 + if( MQTTASYNC_SUCCESS == ccb )
  139 + {
  140 + LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
  141 + }
  142 + else
  143 + {
  144 + LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
  145 + }
  146 + */
134 147 m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this);
135 148 }
136 149 else
... ... @@ -194,7 +207,7 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
194 207 }
195 208  
196 209 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
197   - conn_opts.keepAliveInterval = 20;
  210 + conn_opts.keepAliveInterval = 5;
198 211 conn_opts.cleansession = 1;
199 212 conn_opts.onSuccess = &ClientPaho::onConnectSuccess;
200 213 conn_opts.onFailure = &ClientPaho::onConnectFailure;
... ... @@ -739,6 +752,22 @@ void ClientPaho::callbackEventHandler()
739 752 // ("ClientPaho", "%1 - leaving callback event handler", m_clientId);
740 753 }
741 754  
  755 +void ClientPaho::onFirstConnectInstance(const std::string &reason)
  756 +{
  757 + (void)reason;
  758 + {
  759 + OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  760 + // Register the connect callback that is used in reconnect scenarios.
  761 + auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect);
  762 + if (MQTTASYNC_SUCCESS != rc)
  763 + {
  764 + LogError( "[ClientPaho]", std::string( "onFirstConnectInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
  765 + }
  766 + }
  767 +
  768 + setConnectionStatus(ConnectionStatus::Connected);
  769 +}
  770 +
742 771 void ClientPaho::onConnectOnInstance(const std::string& cause)
743 772 {
744 773 (void)cause;
... ... @@ -756,15 +785,15 @@ void ClientPaho::onConnectOnInstance(const std::string&amp; cause)
756 785 void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response)
757 786 {
758 787 auto connectData = response.connectionData();
759   - // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)",
760   - // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent());
  788 + LogDebug( "[ClientPaho]", std::string( "onConnectSuccessOnInstance " + m_clientId + " - connected to endpoint " + connectData.serverUri() +
  789 + " (mqtt version " + std::to_string( connectData.mqttVersion() ) + ", session present " + ( connectData.sessionPresent() ? "TRUE" : "FALSE" ) + " )" ) );
761 790 {
762 791 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
763 792 // Register the connect callback that is used in reconnect scenarios.
764 793 auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect);
765 794 if (MQTTASYNC_SUCCESS != rc)
766 795 {
767   - // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
  796 + LogError( "[ClientPaho]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
768 797 }
769 798 // For MQTTV5
770 799 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
... ... @@ -775,9 +804,11 @@ void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess&amp; response)
775 804 m_operationResult[-100] = true;
776 805 m_pendingOperations.erase(-100);
777 806 }
  807 +
778 808 setConnectionStatus(ConnectionStatus::Connected);
779   - if (m_connectPromise)
  809 + if(m_connectPromise)
780 810 {
  811 + LogDebug( "[ClientPaho]", std::string("connectPromise still present. Resetting!") );
781 812 m_connectPromise->set_value();
782 813 }
783 814 m_operationsCompleteCV.notify_all();
... ... @@ -786,7 +817,7 @@ void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess&amp; response)
786 817 void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response)
787 818 {
788 819 (void)response;
789   - // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
  820 + LogDebug("ClientPaho", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
790 821 {
791 822 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
792 823 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
... ... @@ -1077,8 +1108,20 @@ void ClientPaho::onConnectionLostOnInstance(const std::string&amp; cause)
1077 1108 }
1078 1109  
1079 1110 // static
  1111 +void ClientPaho::onFirstConnect(void* context, char* cause)
  1112 +{
  1113 + LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
  1114 + if(context)
  1115 + {
  1116 + auto *cl = reinterpret_cast<ClientPaho*>(context);
  1117 + std::string reason(nullptr == cause ? "Unknown cause" : cause);
  1118 + cl->pushIncomingEvent([cl, reason]() { cl->onFirstConnectInstance(reason); });
  1119 + }
  1120 +}
  1121 +
1080 1122 void ClientPaho::onConnect(void* context, char* cause)
1081 1123 {
  1124 + LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
1082 1125 if (context)
1083 1126 {
1084 1127 auto* cl = reinterpret_cast<ClientPaho*>(context);
... ... @@ -1090,6 +1133,7 @@ void ClientPaho::onConnect(void* context, char* cause)
1090 1133 // static
1091 1134 void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response)
1092 1135 {
  1136 + LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSucces triggered.." );
1093 1137 if (context)
1094 1138 {
1095 1139 auto* cl = reinterpret_cast<ClientPaho*>(context);
... ... @@ -1147,6 +1191,7 @@ void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* respo
1147 1191 // static
1148 1192 void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response)
1149 1193 {
  1194 + LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
1150 1195 if (context)
1151 1196 {
1152 1197 auto* cl = reinterpret_cast<ClientPaho*>(context);
... ...
src/connectionstatus.cpp
... ... @@ -38,5 +38,5 @@ std::ostream&amp; operator&lt;&lt;(std::ostream &amp;os, ConnectionStatus rhs)
38 38 case ConnectionStatus::Connected:
39 39 return os << "Connected";
40 40 }
41   - return os << "Unknown"; // Should never be reached.
  41 + return os << "Unknown"; // "Should" never be reached. ( But guess what? )
42 42 }
... ...
src/log.cpp
... ... @@ -53,7 +53,7 @@ int toInt( LogLevel level )
53 53  
54 54 std::string Log::s_context = std::string();
55 55 std::string Log::s_fileName = std::string();
56   -LogLevel Log::s_logLevel = LogLevel::Error;
  56 +LogLevel Log::s_logLevel = LogLevel::Debug;
57 57 LogMask Log::s_logMask = LogMask::None;
58 58  
59 59 void Log::init( const std::string& context, const std::string& logFile, LogLevel logDepth )
... ...
src/mqttclient.cpp
... ... @@ -125,7 +125,7 @@ StateEnum MqttClient::state() const
125 125 return m_serverState.state();
126 126 }
127 127  
128   -void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt )
  128 +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt, bool blocking )
129 129 {
130 130 osdev::components::mqtt::ParsedUri _endpoint = {
131 131 { "scheme", "tcp" },
... ... @@ -135,10 +135,10 @@ void MqttClient::connect(const std::string&amp; host, int port, const Credentials &amp;c
135 135 { "port", std::to_string(port) }
136 136 };
137 137  
138   - this->connect( UriParser::toString( _endpoint ), lwt );
  138 + this->connect( UriParser::toString( _endpoint ), lwt, blocking );
139 139 }
140 140  
141   -void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt )
  141 +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt, bool blocking )
142 142 {
143 143 LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) );
144 144  
... ... @@ -171,7 +171,7 @@ void MqttClient::connect( const std::string &amp;_endpoint, const mqtt_LWT &amp;lwt )
171 171 client = m_principalClient.get();
172 172 }
173 173  
174   - client->connect( true, lwt );
  174 + client->connect( blocking, lwt );
175 175 }
176 176  
177 177 void MqttClient::disconnect()
... ... @@ -254,12 +254,15 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
254 254 // throw (?)(MqttException, "Not connected");
255 255 return Token(m_clientId, -1);
256 256 }
257   - if (!m_principalClient->isOverlapping(topic)) {
  257 + if (!m_principalClient->isOverlapping(topic))
  258 + {
258 259 client = m_principalClient.get();
259 260 clientFound = true;
260 261 }
261   - else {
262   - for (const auto& c : m_additionalClients) {
  262 + else
  263 + {
  264 + for (const auto& c : m_additionalClients)
  265 + {
263 266 if (!c->isOverlapping(topic)) {
264 267 client = c.get();
265 268 clientFound = true;
... ... @@ -267,7 +270,8 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
267 270 }
268 271 }
269 272 }
270   - if (!clientFound) {
  273 + if (!clientFound)
  274 + {
271 275 LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) );
272 276 std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1.
273 277 m_additionalClients.emplace_back(std::make_unique<ClientPaho>(
... ... @@ -278,7 +282,8 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
278 282 client = m_additionalClients.back().get();
279 283 }
280 284 }
281   - if (!clientFound) {
  285 + if (!clientFound)
  286 + {
282 287 client->connect(true);
283 288 }
284 289 return Token{ client->clientId(), client->subscribe(topic, qos, cb) };
... ...