Merged
Merge Request #11 · created by Steven de Ridder


Fix/pgroen/deferred connection

converted all logs to log.h format, except a few commented lines containing std::set params.
also made a number of syntax fixes and ran through the connection scenarios again, which are all working correctly now.


From fix/pgroen/deferred_connection into development

Merged by Peter M. Groen

Source branch has been removed
2 participants





CMakeLists.txt
... ... @@ -8,6 +8,7 @@ include(projectheader)
8 8 project_header(osdev_mqtt)
9 9  
10 10 add_subdirectory(src)
  11 +add_subdirectory(examples/connect)
11 12 add_subdirectory(examples/pub)
12 13 add_subdirectory(examples/sub)
13 14  
... ...
debug_log.txt 0 → 100644
  1 +With broker present
  2 +
  3 +Jul 02 00:50:19 intelnuc64.osdev.nl test_connection[30797]: MQTT Client started[30797]: [MqttClient::MqttClient]
  4 +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::eventHandler][30797]: ConnectionTest - starting event handler.
  5 +Jul 02 00:50:19 intelnuc64.osdev.nl MqttClient[30797]: ConnectionTest - Request connect
  6 +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - Setting the extra onConnected callback.
  7 +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 2
  8 +Jul 02 00:50:19 intelnuc64.osdev.nl ClientPaho[30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - starting callback event handler
  9 +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho::onConnectSuccess][30797]: onConnectSucces triggered..
  10 +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: onConnectSuccessOnInstance ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - connected to endpoint localhost:1883 (mqtt version 4, session present FALSE )
  11 +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 4
  12 +Jul 02 00:50:30 intelnuc64.osdev.nl mosquitto[29463]: 1656715830: Client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a closed its connection.
  13 +
  14 +
  15 +Without a broker present
  16 +
  17 +Jul 02 00:55:33 intelnuc64.osdev.nl test_connection[31574]: MQTT Client started[31574]: [MqttClient::MqttClient]
  18 +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::eventHandler][31574]: ConnectionTest - starting event handler.
  19 +Jul 02 00:55:33 intelnuc64.osdev.nl MqttClient[31574]: ConnectionTest - Request connect
  20 +Jul 02 00:55:33 intelnuc64.osdev.nl [ClientPaho][31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - Setting the extra onConnected callback.
  21 +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 2
  22 +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - starting callback event handler
  23 +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: onConnectFailureOnInstanceConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - connection failed with code MQTTASYNC_FAILURE (TCP/TLS connect failure)
  24 +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 0
  25 +
  26 +Jul 02 00:55:42 intelnuc64.osdev.nl systemd[1]: Starting Mosquitto MQTT Broker...
  27 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 starting
  28 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Config loaded from /etc/mosquitto/mosquitto.conf.
  29 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Starting in local only mode. Connections will only be possible from clients running on this machine.
  30 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Create a configuration file which defines a listener to allow remote access.
  31 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: For more details see https://mosquitto.org/documentation/authentication-methods/
  32 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv4 listen socket on port 1883.
  33 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv6 listen socket on port 1883.
  34 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 running
  35 +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New connection from 127.0.0.1:42196 on port 1883.
  36 +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New client connected from 127.0.0.1:42196 as ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 (p2, c1, k5).
  37 +Jul 02 00:55:53 intelnuc64.osdev.nl mosquitto[31610]: 1656716153: Client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 closed its connection.
  38 +
  39 +
... ...
examples/connect/CMakeLists.txt 0 → 100644
  1 +cmake_minimum_required(VERSION 3.0)
  2 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
  3 +
  4 +include(projectheader)
  5 +project_header(test_connections)
  6 +
  7 +include_directories( SYSTEM
  8 + ${CMAKE_CURRENT_SOURCE_DIR}/../../include
  9 +)
  10 +
  11 +include(compiler)
  12 +set(SRC_LIST
  13 + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
  14 +)
  15 +
  16 +add_executable( ${PROJECT_NAME}
  17 + ${SRC_LIST}
  18 +)
  19 +
  20 +target_link_libraries(
  21 + ${PROJECT_NAME}
  22 + mqtt-cpp
  23 +)
  24 +
  25 +set_target_properties( ${PROJECT_NAME} PROPERTIES
  26 + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
  27 + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  28 + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  29 +)
  30 +
  31 +include(installation)
  32 +install_application()
... ...
examples/connect/main.cpp 0 → 100644
  1 +
  2 +#include <iostream>
  3 +#include <unistd.h>
  4 +
  5 +#include "mqttclient.h"
  6 +
  7 +using namespace osdev::components::mqtt;
  8 +
  9 +int main( int argc, char* argv[] )
  10 +{
  11 + (void)argc;
  12 + (void)argv;
  13 +
  14 + MqttClient oClient( "ConnectionTest" );
  15 + oClient.connect( "localhost", 1883, Credentials() );
  16 +
  17 + unsigned int nCount = 0;
  18 +
  19 + while(1)
  20 + {
  21 + std::cout << "[" << std::to_string(nCount++) << "] MQTT Client status : " << oClient.state() << std::endl;
  22 + sleep( 1 );
  23 + }
  24 +
  25 + return 0;
  26 +}
... ...
examples/pub/main.cpp
... ... @@ -94,7 +94,7 @@ int main( int argc, char* argv[] )
94 94 }
95 95 messageNumber++;
96 96 }
97   - sleepcp( 5, T_SECONDS );
  97 + sleepcp( 1, T_SECONDS );
98 98 }
99 99 }
100 100 else
... ...
examples/sub/main.cpp
... ... @@ -83,7 +83,7 @@ int main( int argc, char* argv[] )
83 83 // Start a loop to give the subscriber the possibility to do its work.
84 84 while( 1 )
85 85 {
86   - sleepcp( 1, T_MICRO ); // Sleep 1 Sec to give the scheduler the change to interfene.
  86 + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
87 87 }
88 88 }
89 89 else
... ...
include/clientpaho.h
... ... @@ -215,7 +215,7 @@ private:
215 215 * The connection status is set to Connected.
216 216 * @param response A success response with connection data.
217 217 */
218   - void onConnectSuccessOnInstance(const MqttSuccess& response);
  218 + void onConnectSuccessOnInstance();
219 219  
220 220 /**
221 221 * @brief Callback that is called when a connect call fails after being sent to the endpoint.
... ... @@ -298,6 +298,7 @@ private:
298 298 void onConnectionLostOnInstance(const std::string& cause);
299 299  
300 300 // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts.
  301 + static void onFirstConnect(void* context, char* cause);
301 302 static void onConnect(void* context, char* cause);
302 303 static void onConnectSuccess(void* context, MQTTAsync_successData* response);
303 304 static void onConnectFailure(void* context, MQTTAsync_failureData* response);
... ...
include/imqttclient.h
... ... @@ -60,13 +60,13 @@ public:
60 60 * @param port The port to use.
61 61 * @param credentials The credentials to use.
62 62 */
63   - virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0;
  63 + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0;
64 64  
65 65 /**
66 66 * @brief Connect to the endpoint
67 67 * @param endpoint an uri endpoint description.
68 68 */
69   - virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0;
  69 + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0;
70 70  
71 71 /**
72 72 * @brief Disconnect the client from the broker
... ...
include/mqttclient.h
... ... @@ -92,12 +92,12 @@ public:
92 92 /**
93 93 * @see IMqttClient
94 94 */
95   - virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override;
  95 + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;
96 96  
97 97 /**
98 98 * @see IMqttClient
99 99 */
100   - virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override;
  100 + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;
101 101  
102 102 /**
103 103 * @see IMqttClient
... ...
src/clientpaho.cpp
... ... @@ -24,6 +24,7 @@
24 24 #include "errorcode.h"
25 25 #include "mqttutil.h"
26 26 #include "lockguard.h"
  27 +#include "log.h"
27 28 #include "metaprogrammingdefs.h"
28 29 #include "mqttstream.h"
29 30 #include "scopeguard.h"
... ... @@ -93,10 +94,10 @@ struct Init
93 94  
94 95 std::atomic_int ClientPaho::s_numberOfInstances(0);
95 96  
96   -ClientPaho::ClientPaho(const std::string& _endpoint,
  97 +ClientPaho::ClientPaho( const std::string& _endpoint,
97 98 const std::string& _id,
98   - const std::function<void(const std::string&, ConnectionStatus)>& connectionStatusCallback,
99   - const std::function<void(const std::string& clientId, std::int32_t pubMsgToken)>& deliveryCompleteCallback)
  99 + const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
  100 + const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
100 101 : m_mutex()
101 102 , m_endpoint()
102 103 , m_username()
... ... @@ -122,51 +123,55 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint,
122 123 , m_callbackEventQueue(m_clientId)
123 124 , m_workerThread()
124 125 {
125   - if (0 == s_numberOfInstances++) {
126   - MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho);
  126 + if( 0 == s_numberOfInstances++ )
  127 + {
  128 + MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
127 129 }
128   - // MLOGIC_COMMON_DEBUG("ClientPaho", "%1 - ctor ClientPaho %2", m_clientId, this);
  130 +
  131 + LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
129 132 parseEndpoint(_endpoint);
130   - auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
131   - if (MQTTASYNC_SUCCESS == rc)
  133 +
  134 + auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
  135 + if( MQTTASYNC_SUCCESS == rc )
132 136 {
133   - MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete);
134   - m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this);
  137 + MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
  138 + m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
135 139 }
136 140 else
137 141 {
138   - // Do something sensible here.
  142 + LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
139 143 }
140 144 }
141 145  
142 146 ClientPaho::~ClientPaho()
143 147 {
  148 + LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
144 149 if( MQTTAsync_isConnected( m_client ) )
145 150 {
146 151 this->unsubscribeAll();
147 152  
148   - this->waitForCompletion(std::chrono::milliseconds(2000), std::set<int32_t>{});
149   - this->disconnect(true, 5000);
  153 + this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
  154 + this->disconnect( true, 5000 );
150 155 }
151 156 else
152 157 {
153 158 // If the status was already disconnected this call does nothing
154   - setConnectionStatus(ConnectionStatus::Disconnected);
  159 + setConnectionStatus( ConnectionStatus::Disconnected );
155 160 }
156 161  
157   - if (0 == --s_numberOfInstances)
  162 + if( 0 == --s_numberOfInstances )
158 163 {
159 164 // encountered a case where termination of the logging system within paho led to a segfault.
160 165 // This was a paho thread that was cleaned while at the same time the logging system was terminated.
161 166 // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
162 167 // frequently.
163   - MQTTAsync_setTraceCallback(nullptr);
  168 + MQTTAsync_setTraceCallback( nullptr );
164 169 }
165 170  
166   - MQTTAsync_destroy(&m_client);
  171 + MQTTAsync_destroy( &m_client );
167 172  
168 173 m_callbackEventQueue.stop();
169   - if (m_workerThread.joinable())
  174 + if( m_workerThread.joinable() )
170 175 {
171 176 m_workerThread.join();
172 177 }
... ... @@ -193,14 +198,28 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
193 198 setConnectionStatus( ConnectionStatus::ConnectInProgress );
194 199 }
195 200  
  201 + LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
  202 +
196 203 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
197   - conn_opts.keepAliveInterval = 20;
  204 + conn_opts.keepAliveInterval = 5;
198 205 conn_opts.cleansession = 1;
199   - conn_opts.onSuccess = &ClientPaho::onConnectSuccess;
  206 + conn_opts.onSuccess = nullptr;
200 207 conn_opts.onFailure = &ClientPaho::onConnectFailure;
201 208 conn_opts.context = this;
202 209 conn_opts.automaticReconnect = 1;
203 210  
  211 + // Make sure we get a signal if the promise is fulfilled
  212 + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
  213 + if( MQTTASYNC_SUCCESS == ccb )
  214 + {
  215 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
  216 + }
  217 + else
  218 + {
  219 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
  220 + }
  221 +
  222 + // Setup the last will and testament, if so desired.
204 223 if( !lwt.topic().empty() )
205 224 {
206 225 MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
... ... @@ -208,13 +227,14 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
208 227 will_opts.topicName = lwt.topic().c_str();
209 228  
210 229 conn_opts.will = &will_opts;
  230 +
  231 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
211 232 }
212 233 else
213 234 {
214 235 conn_opts.will = nullptr;
215 236 }
216 237  
217   -
218 238 if( !m_username.empty() )
219 239 {
220 240 conn_opts.username = m_username.c_str();
... ... @@ -259,18 +279,19 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
259 279 return -100;
260 280 }
261 281  
262   -std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
  282 +std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
263 283 {
264 284 ConnectionStatus currentStatus = m_connectionStatus;
265 285  
266 286 {
267 287 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
268   - if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) {
  288 + if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
  289 + {
269 290 return -1;
270 291 }
271 292  
272 293 currentStatus = m_connectionStatus;
273   - setConnectionStatus(ConnectionStatus::DisconnectInProgress);
  294 + setConnectionStatus( ConnectionStatus::DisconnectInProgress );
274 295 }
275 296  
276 297 MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
... ... @@ -282,43 +303,45 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
282 303 std::promise<void> waitForDisconnectPromise{};
283 304 auto waitForDisconnect = waitForDisconnectPromise.get_future();
284 305 m_disconnectPromise.reset();
285   - if (wait) {
  306 + if( wait )
  307 + {
286 308 m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
287 309 }
288 310  
289 311 {
290 312 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
291   - if (!m_pendingOperations.insert(-200).second)
  313 + if( !m_pendingOperations.insert( -200 ).second )
292 314 {
293   - // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200)
  315 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
294 316 }
295   - m_operationResult.erase(-200);
  317 + m_operationResult.erase( -200 );
296 318 }
297 319  
298   - int rc = MQTTAsync_disconnect(m_client, &disconn_opts);
299   - if (MQTTASYNC_SUCCESS != rc) {
300   - if (MQTTASYNC_DISCONNECTED == rc) {
  320 + int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
  321 + if( MQTTASYNC_SUCCESS != rc )
  322 + {
  323 + if( MQTTASYNC_DISCONNECTED == rc )
  324 + {
301 325 currentStatus = ConnectionStatus::Disconnected;
302 326 }
303 327  
304   - setConnectionStatus(currentStatus);
305   - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  328 + setConnectionStatus( currentStatus );
  329 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
306 330 m_operationResult[-200] = false;
307   - m_pendingOperations.erase(-200);
  331 + m_pendingOperations.erase( -200 );
308 332  
309   - if (MQTTASYNC_DISCONNECTED == rc)
  333 + if( MQTTASYNC_DISCONNECTED == rc )
310 334 {
311 335 return -1;
312 336 }
313   - // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
  337 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
314 338 }
315 339  
316 340 if( wait )
317 341 {
318 342 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
319 343 {
320   - // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId);
321   -
  344 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
322 345 }
323 346 waitForDisconnect.get();
324 347 m_disconnectPromise.reset();
... ... @@ -326,67 +349,67 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
326 349 return -200;
327 350 }
328 351  
329   -std::int32_t ClientPaho::publish(const MqttMessage& message, int qos)
  352 +std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
330 353 {
331   - if (ConnectionStatus::DisconnectInProgress == m_connectionStatus)
  354 + if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
332 355 {
333   - // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic());
  356 + LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
334 357 return -1;
335 358 }
336   - else if (ConnectionStatus::Disconnected == m_connectionStatus)
  359 + else if( ConnectionStatus::Disconnected == m_connectionStatus )
337 360 {
338   - // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId);
  361 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
339 362 connect( true );
340 363 }
341 364  
342   - if (!isValidTopic(message.topic()))
  365 + if( !isValidTopic(message.topic() ) )
343 366 {
344   - // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic());
  367 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
345 368 }
346 369  
347   - if (qos > 2)
  370 + if( qos > 2 )
348 371 {
349 372 qos = 2;
350 373 }
351   - else if (qos < 0)
  374 + else if( qos < 0 )
352 375 {
353 376 qos = 0;
354 377 }
355 378  
356   -
357 379 std::unique_lock<std::mutex> lck(m_mutex);
358   - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) {
  380 + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes )
  381 + {
359 382 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
360   - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) {
361   - // ("ClientPaho", "Adding publish to pending queue.");
362   - m_pendingPublishes.push_front(Publish{ qos, message });
  383 + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus )
  384 + {
  385 + LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
  386 + m_pendingPublishes.push_front( Publish{ qos, message } );
363 387 return -1;
364 388 }
365 389 }
366 390  
367   - return publishInternal(message, qos);
  391 + return publishInternal( message, qos );
368 392 }
369 393  
370 394 void ClientPaho::publishPending()
371 395 {
372 396 {
373 397 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
374   - if (!m_processPendingPublishes)
  398 + if( !m_processPendingPublishes )
375 399 {
376 400 return;
377 401 }
378 402 }
379 403  
380   - if (ConnectionStatus::Connected != m_connectionStatus)
  404 + if( ConnectionStatus::Connected != m_connectionStatus )
381 405 {
382   - // MqttException, "Not connected");
  406 + LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
383 407 }
384 408  
385   - while (!m_pendingPublishes.empty())
  409 + while( !m_pendingPublishes.empty() )
386 410 {
387 411 const auto& pub = m_pendingPublishes.back();
388   - publishInternal(pub.data, pub.qos);
389   - // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what());
  412 + publishInternal( pub.data, pub.qos );
390 413  
391 414 m_pendingPublishes.pop_back();
392 415 }
... ... @@ -398,23 +421,23 @@ void ClientPaho::publishPending()
398 421 m_pendingPublishesReadyCV.notify_all();
399 422 }
400 423  
401   -std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb)
  424 +std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
402 425 {
403   - if (ConnectionStatus::Connected != m_connectionStatus)
  426 + if( ConnectionStatus::Connected != m_connectionStatus )
404 427 {
405 428 // MqttException, "Not connected"
406 429 }
407 430  
408   - if (!isValidTopic(topic))
  431 + if( !isValidTopic( topic ) )
409 432 {
410 433 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
411 434 }
412 435  
413   - if (qos > 2)
  436 + if( qos > 2 )
414 437 {
415 438 qos = 2;
416 439 }
417   - else if (qos < 0)
  440 + else if( qos < 0 )
418 441 {
419 442 qos = 0;
420 443 }
... ... @@ -423,21 +446,27 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std:
423 446 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
424 447  
425 448 auto itExisting = m_subscriptions.find(topic);
426   - if (m_subscriptions.end() != itExisting) {
427   - if (itExisting->second.qos == qos) {
  449 + if( m_subscriptions.end() != itExisting )
  450 + {
  451 + if( itExisting->second.qos == qos )
  452 + {
428 453 return -1;
429 454 }
430 455 // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
431 456 }
432 457  
433 458 auto itPending = m_pendingSubscriptions.find(topic);
434   - if (m_pendingSubscriptions.end() != itPending) {
435   - if (itPending->second.qos == qos) {
436   - auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; });
437   - if (m_subscribeTokenToTopic.end() != itToken) {
  459 + if( m_pendingSubscriptions.end() != itPending )
  460 + {
  461 + if( itPending->second.qos == qos )
  462 + {
  463 + auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
  464 + if( m_subscribeTokenToTopic.end() != itToken )
  465 + {
438 466 return itToken->first;
439 467 }
440   - else {
  468 + else
  469 + {
441 470 return -1;
442 471 }
443 472 }
... ... @@ -445,45 +474,45 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std:
445 474 }
446 475  
447 476 std::string existingTopic{};
448   - if (isOverlappingInternal(topic, existingTopic))
  477 + if( isOverlappingInternal( topic, existingTopic ) )
449 478 {
450 479 // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
451 480 }
452 481  
453   - // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic);
454   - m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb }));
  482 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
  483 + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
455 484 }
456   - return subscribeInternal(topic, qos);
  485 + return subscribeInternal( topic, qos );
457 486 }
458 487  
459 488 void ClientPaho::resubscribe()
460 489 {
461   - decltype(m_pendingSubscriptions) pendingSubscriptions{};
  490 + decltype( m_pendingSubscriptions ) pendingSubscriptions{};
462 491 {
463 492 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
464   - std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end()));
  493 + std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
465 494 }
466 495  
467   - for (const auto& s : pendingSubscriptions)
  496 + for( const auto& s : pendingSubscriptions )
468 497 {
469   - subscribeInternal(s.first, s.second.qos);
  498 + subscribeInternal( s.first, s.second.qos );
470 499 }
471 500 }
472 501  
473 502 std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
474 503 {
475 504 {
476   - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  505 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
477 506 bool found = false;
478   - for (const auto& s : m_subscriptions)
  507 + for( const auto& s : m_subscriptions )
479 508 {
480   - if (topic == s.first && qos == s.second.qos)
  509 + if( topic == s.first && qos == s.second.qos )
481 510 {
482 511 found = true;
483 512 break;
484 513 }
485 514 }
486   - if (!found)
  515 + if( !found )
487 516 {
488 517 return -1;
489 518 }
... ... @@ -498,21 +527,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
498 527 // Need to lock the mutex because it is possible that the callback is faster than
499 528 // the insertion of the token into the pending operations.
500 529 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
501   - auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts);
502   - if (MQTTASYNC_SUCCESS != rc)
  530 + auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
  531 + if( MQTTASYNC_SUCCESS != rc )
503 532 {
504   - // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc));
  533 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
505 534 }
506 535  
507   - if (!m_pendingOperations.insert(opts.token).second)
  536 + if( !m_pendingOperations.insert( opts.token ).second )
508 537 {
509   - // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token);
  538 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) );
510 539 }
511 540  
512   - m_operationResult.erase(opts.token);
513   - if (m_unsubscribeTokenToTopic.count(opts.token) > 0)
  541 + m_operationResult.erase( opts.token );
  542 + if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
514 543 {
515   - // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic);
  544 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
516 545 }
517 546 m_lastUnsubscribe = opts.token; // centos7 workaround
518 547 m_unsubscribeTokenToTopic[opts.token] = topic;
... ... @@ -526,39 +555,42 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
526 555  
527 556 void ClientPaho::unsubscribeAll()
528 557 {
529   - decltype(m_subscriptions) subscriptions{};
  558 + decltype( m_subscriptions ) subscriptions{};
530 559 {
531 560 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
532 561 subscriptions = m_subscriptions;
533 562 }
534 563  
535   - for (const auto& s : subscriptions) {
536   - this->unsubscribe(s.first, s.second.qos);
  564 + for( const auto& s : subscriptions )
  565 + {
  566 + this->unsubscribe( s.first, s.second.qos );
537 567 }
538 568 }
539 569  
540 570 std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
541 571 {
542   - if (waitFor <= std::chrono::milliseconds(0)) {
543   - return std::chrono::milliseconds(0);
  572 + if( waitFor <= std::chrono::milliseconds( 0 ) )
  573 + {
  574 + return std::chrono::milliseconds( 0 );
544 575 }
545 576 std::chrono::milliseconds timeElapsed{};
546 577 {
547   - osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds)
  578 + osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
548 579 {
549   - timeElapsed = std::chrono::ceil<std::chrono::milliseconds>(sinceStart);
  580 + timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
550 581 });
551 582 std::unique_lock<std::mutex> lck(m_mutex);
  583 +
552 584 // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
553 585 m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
554 586 {
555   - if (tokens.empty())
  587 + if( tokens.empty() )
556 588 { // wait for all operations to end
557 589 return m_pendingOperations.empty();
558 590 }
559   - else if (tokens.size() == 1)
  591 + else if( tokens.size() == 1 )
560 592 {
561   - return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end();
  593 + return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
562 594 }
563 595 std::vector<std::int32_t> intersect{};
564 596 std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
... ... @@ -568,16 +600,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond
568 600 return timeElapsed;
569 601 }
570 602  
571   -bool ClientPaho::isOverlapping(const std::string& topic) const
  603 +bool ClientPaho::isOverlapping( const std::string& topic ) const
572 604 {
573 605 std::string existingTopic{};
574   - return isOverlapping(topic, existingTopic);
  606 + return isOverlapping( topic, existingTopic );
575 607 }
576 608  
577   -bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const
  609 +bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
578 610 {
579 611 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
580   - return isOverlappingInternal(topic, existingTopic);
  612 + return isOverlappingInternal( topic, existingTopic );
581 613 }
582 614  
583 615 std::vector<std::int32_t> ClientPaho::pendingOperations() const
... ... @@ -585,7 +617,7 @@ std::vector&lt;std::int32_t&gt; ClientPaho::pendingOperations() const
585 617 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
586 618 std::vector<std::int32_t> retval{};
587 619 retval.resize(m_pendingOperations.size());
588   - std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin());
  620 + std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
589 621 return retval;
590 622 }
591 623  
... ... @@ -595,36 +627,36 @@ bool ClientPaho::hasPendingSubscriptions() const
595 627 return !m_pendingSubscriptions.empty();
596 628 }
597 629  
598   -boost::optional<bool> ClientPaho::operationResult(std::int32_t token) const
  630 +boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
599 631 {
600 632 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
601 633 boost::optional<bool> ret{};
602   - auto cit = m_operationResult.find(token);
603   - if (m_operationResult.end() != cit)
  634 + auto cit = m_operationResult.find( token );
  635 + if( m_operationResult.end() != cit )
604 636 {
605 637 ret = cit->second;
606 638 }
607 639 return ret;
608 640 }
609 641  
610   -void ClientPaho::parseEndpoint(const std::string& _endpoint)
  642 +void ClientPaho::parseEndpoint( const std::string& _endpoint )
611 643 {
612   - auto ep = UriParser::parse(_endpoint);
613   - if (ep.find("user") != ep.end())
  644 + auto ep = UriParser::parse( _endpoint );
  645 + if( ep.find( "user" ) != ep.end() )
614 646 {
615 647 m_username = ep["user"];
616 648 ep["user"].clear();
617 649 }
618 650  
619   - if (ep.find("password") != ep.end())
  651 + if( ep.find( "password" ) != ep.end() )
620 652 {
621 653 m_password = ep["password"];
622 654 ep["password"].clear();
623 655 }
624   - m_endpoint = UriParser::toString(ep);
  656 + m_endpoint = UriParser::toString( ep );
625 657 }
626 658  
627   -std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos)
  659 +std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
628 660 {
629 661 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
630 662 opts.onSuccess = &ClientPaho::onPublishSuccess;
... ... @@ -637,21 +669,21 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage&amp; message, int qos)
637 669 // the insertion of the token into the pending operations.
638 670  
639 671 // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
640   - auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts);
641   - if (MQTTASYNC_SUCCESS != rc)
  672 + auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
  673 + if( MQTTASYNC_SUCCESS != rc )
642 674 {
643   - // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc));
  675 + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
644 676 }
645 677  
646   - if (!m_pendingOperations.insert(opts.token).second)
  678 + if( !m_pendingOperations.insert( opts.token ).second )
647 679 {
648   - // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token);
  680 + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
649 681 }
650   - m_operationResult.erase(opts.token);
  682 + m_operationResult.erase( opts.token );
651 683 return opts.token;
652 684 }
653 685  
654   -std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos)
  686 +std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
655 687 {
656 688 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
657 689 opts.onSuccess = &ClientPaho::onSubscribeSuccess;
... ... @@ -661,52 +693,51 @@ std::int32_t ClientPaho::subscribeInternal(const std::string&amp; topic, int qos)
661 693 // Need to lock the mutex because it is possible that the callback is faster than
662 694 // the insertion of the token into the pending operations.
663 695 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
664   - auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts);
  696 + auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
665 697 if (MQTTASYNC_SUCCESS != rc)
666 698 {
667   - m_pendingSubscriptions.erase(topic);
668   - // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc));
669   - // (MqttException, "Subscription failed");
  699 + m_pendingSubscriptions.erase( topic );
  700 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
670 701 }
671 702  
672   - if (!m_pendingOperations.insert(opts.token).second)
  703 + if( !m_pendingOperations.insert( opts.token ).second )
673 704 {
674   - // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token);
  705 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
675 706 }
676   - m_operationResult.erase(opts.token);
677   - if (m_subscribeTokenToTopic.count(opts.token) > 0)
  707 + m_operationResult.erase( opts.token );
  708 + if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
678 709 {
679   - // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic);
  710 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
680 711 }
681 712 m_subscribeTokenToTopic[opts.token] = topic;
682 713 return opts.token;
683 714 }
684 715  
685   -void ClientPaho::setConnectionStatus(ConnectionStatus status)
  716 +void ClientPaho::setConnectionStatus( ConnectionStatus status )
686 717 {
687 718 ConnectionStatus curStatus = m_connectionStatus;
688 719 m_connectionStatus = status;
689   - if (status != curStatus && m_connectionStatusCallback)
  720 + if( status != curStatus && m_connectionStatusCallback )
690 721 {
691   - m_connectionStatusCallback(m_clientId, status);
  722 + m_connectionStatusCallback( m_clientId, status );
692 723 }
693 724 }
694 725  
695   -bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const
  726 +bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
696 727 {
697 728 existingTopic.clear();
698   - for (const auto& s : m_pendingSubscriptions)
  729 + for( const auto& s : m_pendingSubscriptions )
699 730 {
700   - if (testForOverlap(s.first, topic))
  731 + if( testForOverlap( s.first, topic ) )
701 732 {
702 733 existingTopic = s.first;
703 734 return true;
704 735 }
705 736 }
706 737  
707   - for (const auto& s : m_subscriptions)
  738 + for( const auto& s : m_subscriptions )
708 739 {
709   - if (testForOverlap(s.first, topic))
  740 + if( testForOverlap( s.first, topic ) )
710 741 {
711 742 existingTopic = s.first;
712 743 return true;
... ... @@ -715,87 +746,86 @@ bool ClientPaho::isOverlappingInternal(const std::string&amp; topic, std::string&amp; ex
715 746 return false;
716 747 }
717 748  
718   -void ClientPaho::pushIncomingEvent(std::function<void()> ev)
  749 +void ClientPaho::pushIncomingEvent( std::function<void()> ev )
719 750 {
720   - m_callbackEventQueue.push(ev);
  751 + m_callbackEventQueue.push( ev );
721 752 }
722 753  
723 754 void ClientPaho::callbackEventHandler()
724 755 {
725   - // ("ClientPaho", "%1 - starting callback event handler", m_clientId);
726   - for (;;) {
  756 + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
  757 + for( ;; )
  758 + {
727 759 std::vector<std::function<void()>> events;
728   - if (!m_callbackEventQueue.pop(events))
  760 + if( !m_callbackEventQueue.pop(events) )
729 761 {
730 762 break;
731 763 }
732 764  
733   - for (const auto& ev : events)
  765 + for( const auto& ev : events )
734 766 {
735 767 ev();
736   - // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException);
737 768 }
738 769 }
739   - // ("ClientPaho", "%1 - leaving callback event handler", m_clientId);
  770 + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
740 771 }
741   -
742   -void ClientPaho::onConnectOnInstance(const std::string& cause)
  772 +void ClientPaho::onConnectOnInstance( const std::string& cause )
743 773 {
744   - (void)cause;
745   - // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause);
  774 + (void) cause;
746 775 {
747 776 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
748   - std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end()));
  777 + std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
749 778 m_subscriptions.clear();
750 779 m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
751 780 }
752 781  
753   - setConnectionStatus(ConnectionStatus::Connected);
  782 + setConnectionStatus( ConnectionStatus::Connected );
754 783 }
755 784  
756   -void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response)
  785 +void ClientPaho::onConnectSuccessOnInstance()
757 786 {
758   - auto connectData = response.connectionData();
759   - // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)",
760   - // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent());
761 787 {
762 788 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
763 789 // Register the connect callback that is used in reconnect scenarios.
764   - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect);
765   - if (MQTTASYNC_SUCCESS != rc)
  790 + auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
  791 + if( MQTTASYNC_SUCCESS != rc )
766 792 {
767   - // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
  793 + LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
768 794 }
  795 +
769 796 // For MQTTV5
770 797 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
771 798 //if (MQTTASYNC_SUCCESS != rc) {
772 799 // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
773 800 //}
774 801 // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
  802 +
775 803 m_operationResult[-100] = true;
776 804 m_pendingOperations.erase(-100);
777 805 }
778   - setConnectionStatus(ConnectionStatus::Connected);
779   - if (m_connectPromise)
  806 +
  807 + setConnectionStatus( ConnectionStatus::Connected );
  808 + if( m_connectPromise )
780 809 {
  810 + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
781 811 m_connectPromise->set_value();
782 812 }
783 813 m_operationsCompleteCV.notify_all();
784 814 }
785 815  
786   -void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response)
  816 +void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
787 817 {
788   - (void)response;
789   - // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
  818 + (void) response;
  819 + LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
790 820 {
791 821 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
792 822 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
793 823 m_operationResult[-100] = false;
794 824 m_pendingOperations.erase(-100);
795 825 }
796   - if (ConnectionStatus::ConnectInProgress == m_connectionStatus)
  826 + if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
797 827 {
798   - setConnectionStatus(ConnectionStatus::Disconnected);
  828 + setConnectionStatus( ConnectionStatus::Disconnected );
799 829 }
800 830 m_operationsCompleteCV.notify_all();
801 831 }
... ... @@ -805,9 +835,9 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response)
805 835 // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
806 836 //}
807 837  
808   -void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&)
  838 +void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
809 839 {
810   - // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint);
  840 + LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
811 841 {
812 842 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
813 843 m_subscriptions.clear();
... ... @@ -820,45 +850,46 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&amp;)
820 850 m_pendingOperations.clear();
821 851 }
822 852  
823   - setConnectionStatus(ConnectionStatus::Disconnected);
  853 + setConnectionStatus( ConnectionStatus::Disconnected );
824 854  
825   - if (m_disconnectPromise) {
  855 + if( m_disconnectPromise )
  856 + {
826 857 m_disconnectPromise->set_value();
827 858 }
828 859 m_operationsCompleteCV.notify_all();
829 860 }
830 861  
831   -void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response)
  862 +void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
832 863 {
833   - (void)response;
834   - // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
  864 + (void) response;
  865 + LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
835 866 {
836   - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  867 + OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
837 868 // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
838 869 m_operationResult[-200] = false;
839 870 m_pendingOperations.erase(-200);
840 871 }
841 872  
842   - if (MQTTAsync_isConnected(m_client))
  873 + if( MQTTAsync_isConnected( m_client ) )
843 874 {
844   - setConnectionStatus(ConnectionStatus::Connected);
  875 + setConnectionStatus( ConnectionStatus::Connected );
845 876 }
846 877 else
847 878 {
848   - setConnectionStatus(ConnectionStatus::Disconnected);
  879 + setConnectionStatus( ConnectionStatus::Disconnected );
849 880 }
850 881  
851   - if (m_disconnectPromise)
  882 + if( m_disconnectPromise )
852 883 {
853 884 m_disconnectPromise->set_value();
854 885 }
855 886 m_operationsCompleteCV.notify_all();
856 887 }
857 888  
858   -void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response)
  889 +void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
859 890 {
860 891 auto pd = response.publishData();
861   - // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload());
  892 + LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
862 893 {
863 894 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
864 895 // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
... ... @@ -868,9 +899,9 @@ void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess&amp; response)
868 899 m_operationsCompleteCV.notify_all();
869 900 }
870 901  
871   -void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response)
  902 +void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
872 903 {
873   - // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message());
  904 + LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
874 905 {
875 906 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
876 907 // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
... ... @@ -880,78 +911,82 @@ void ClientPaho::onPublishFailureOnInstance(const MqttFailure&amp; response)
880 911 m_operationsCompleteCV.notify_all();
881 912 }
882 913  
883   -void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response)
  914 +void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
884 915 {
885   - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token());
886   - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
  916 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  917 +
  918 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
887 919 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
888 920 bool operationOk = false;
889   - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]()
  921 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
890 922 {
891 923 // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
892 924 m_operationResult[response.token()] = operationOk;
893   - m_pendingOperations.erase(response.token());
  925 + m_pendingOperations.erase( response.token() );
894 926 });
895   - auto it = m_subscribeTokenToTopic.find(response.token());
896   - if (m_subscribeTokenToTopic.end() == it) {
897   - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token());
  927 + auto it = m_subscribeTokenToTopic.find( response.token() );
  928 + if( m_subscribeTokenToTopic.end() == it )
  929 + {
  930 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
898 931 return;
899 932 }
900 933 auto topic = it->second;
901 934 m_subscribeTokenToTopic.erase(it);
902 935  
903   - auto pendingIt = m_pendingSubscriptions.find(topic);
904   - if (m_pendingSubscriptions.end() == pendingIt)
  936 + auto pendingIt = m_pendingSubscriptions.find( topic );
  937 + if( m_pendingSubscriptions.end() == pendingIt )
905 938 {
906   - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token());
  939 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
907 940 return;
908 941 }
909   - if (response.qos() != pendingIt->second.qos)
  942 + if( response.qos() != pendingIt->second.qos )
910 943 {
911   - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos());
  944 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
912 945 }
913   - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic);
914   - m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second)));
915   - m_pendingSubscriptions.erase(pendingIt);
  946 +
  947 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
  948 + m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
  949 + m_pendingSubscriptions.erase( pendingIt );
916 950 operationOk = true;
917 951 }
918 952  
919   -void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response)
  953 +void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
920 954 {
921   - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
922   - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
  955 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  956 +
  957 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
923 958 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
924   - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]()
  959 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
925 960 {
926 961 // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
927 962 m_operationResult[response.token()] = false;
928   - m_pendingOperations.erase(response.token());
  963 + m_pendingOperations.erase( response.token() );
929 964 });
930 965  
931   - auto it = m_subscribeTokenToTopic.find(response.token());
932   - if (m_subscribeTokenToTopic.end() == it)
  966 + auto it = m_subscribeTokenToTopic.find( response.token() );
  967 + if( m_subscribeTokenToTopic.end() == it )
933 968 {
934   - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token());
  969 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
935 970 return;
936 971 }
937 972 auto topic = it->second;
938   - m_subscribeTokenToTopic.erase(it);
  973 + m_subscribeTokenToTopic.erase( it );
939 974  
940   - auto pendingIt = m_pendingSubscriptions.find(topic);
941   - if (m_pendingSubscriptions.end() == pendingIt)
  975 + auto pendingIt = m_pendingSubscriptions.find( topic );
  976 + if( m_pendingSubscriptions.end() == pendingIt )
942 977 {
943   - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token());
  978 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
944 979 return;
945 980 }
946   - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic);
947   - m_pendingSubscriptions.erase(pendingIt);
  981 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
  982 + m_pendingSubscriptions.erase( pendingIt );
948 983 }
949 984  
950   -void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response)
  985 +void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
951 986 {
952   - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token());
  987 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
953 988  
954   - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
  989 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
955 990 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
956 991  
957 992 // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
... ... @@ -960,116 +995,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess&amp; response)
960 995 // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
961 996 // sequentially (see ClientPaho::unsubscribe)!
962 997 auto token = response.token();
963   - if (-1 == token)
  998 + if( -1 == token )
964 999 {
965 1000 token = m_lastUnsubscribe;
966 1001 m_lastUnsubscribe = -1;
967 1002 }
968 1003  
969 1004 bool operationOk = false;
970   - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]()
  1005 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
971 1006 {
972 1007 // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
973 1008 m_operationResult[token] = operationOk;
974   - m_pendingOperations.erase(token);
  1009 + m_pendingOperations.erase( token );
975 1010 });
976 1011  
977   - auto it = m_unsubscribeTokenToTopic.find(token);
978   - if (m_unsubscribeTokenToTopic.end() == it)
  1012 + auto it = m_unsubscribeTokenToTopic.find( token );
  1013 + if( m_unsubscribeTokenToTopic.end() == it )
979 1014 {
980   - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token);
  1015 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
981 1016 return;
982 1017 }
983 1018 auto topic = it->second;
984   - m_unsubscribeTokenToTopic.erase(it);
  1019 + m_unsubscribeTokenToTopic.erase( it );
985 1020  
986   - auto registeredIt = m_subscriptions.find(topic);
987   - if (m_subscriptions.end() == registeredIt) {
988   - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token());
  1021 + auto registeredIt = m_subscriptions.find( topic );
  1022 + if( m_subscriptions.end() == registeredIt )
  1023 + {
  1024 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
989 1025 return;
990 1026 }
991   - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic);
992   - m_subscriptions.erase(registeredIt);
  1027 +
  1028 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
  1029 + m_subscriptions.erase( registeredIt );
993 1030 operationOk = true;
994 1031 }
995 1032  
996   -void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response)
  1033 +void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
997 1034 {
998   - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
999   - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
  1035 + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  1036 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
1000 1037 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1001   - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]()
  1038 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
1002 1039 {
1003 1040 // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
1004 1041 m_operationResult[response.token()] = false;
1005   - m_pendingOperations.erase(response.token());
  1042 + m_pendingOperations.erase( response.token() );
1006 1043 });
1007 1044  
1008   - auto it = m_unsubscribeTokenToTopic.find(response.token());
1009   - if (m_unsubscribeTokenToTopic.end() == it)
  1045 + auto it = m_unsubscribeTokenToTopic.find( response.token() );
  1046 + if( m_unsubscribeTokenToTopic.end() == it )
1010 1047 {
1011   - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token());
  1048 + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
1012 1049 return;
1013 1050 }
1014 1051 auto topic = it->second;
1015   - m_unsubscribeTokenToTopic.erase(it);
  1052 + m_unsubscribeTokenToTopic.erase( it );
1016 1053 }
1017 1054  
1018   -int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message)
  1055 +int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
1019 1056 {
1020   - // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate());
1021   -
1022   - std::function<void(MqttMessage)> cb;
  1057 + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
1023 1058  
  1059 + std::function<void( MqttMessage )> cb;
1024 1060 {
1025 1061 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1026   - for (const auto& s : m_subscriptions)
  1062 + for( const auto& s : m_subscriptions )
1027 1063 {
1028   - if (boost::regex_match(message.topic(), s.second.topicRegex))
  1064 + if( boost::regex_match( message.topic(), s.second.topicRegex ) )
1029 1065 {
1030 1066 cb = s.second.callback;
1031 1067 }
1032 1068 }
1033 1069 }
1034 1070  
1035   - if (cb)
  1071 + if( cb )
1036 1072 {
1037   - cb(message);
  1073 + cb( message );
1038 1074 }
1039 1075 else
1040 1076 {
1041   - // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic());
  1077 + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
1042 1078 }
1043 1079 return 1;
1044 1080 }
1045 1081  
1046   -void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token)
  1082 +void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
1047 1083 {
1048   - // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token);
1049   - if (m_deliveryCompleteCallback)
  1084 + LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
  1085 + if( m_deliveryCompleteCallback )
1050 1086 {
1051   - m_deliveryCompleteCallback(m_clientId, static_cast<std::int32_t>(token));
  1087 + m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
1052 1088 }
1053 1089 }
1054 1090  
1055   -void ClientPaho::onConnectionLostOnInstance(const std::string& cause)
  1091 +void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
1056 1092 {
1057   - (void)cause;
  1093 + (void) cause;
1058 1094 // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
1059   - setConnectionStatus(ConnectionStatus::ReconnectInProgress);
  1095 + setConnectionStatus( ConnectionStatus::ReconnectInProgress );
1060 1096  
1061 1097 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1062 1098 // Remove all tokens related to subscriptions from the active operations.
1063   - for (const auto& p : m_subscribeTokenToTopic)
  1099 + for( const auto& p : m_subscribeTokenToTopic )
1064 1100 {
1065 1101 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
1066   - m_pendingOperations.erase(p.first);
  1102 + m_pendingOperations.erase( p.first );
1067 1103 }
1068 1104  
1069   - for (const auto& p : m_unsubscribeTokenToTopic)
  1105 + for( const auto& p : m_unsubscribeTokenToTopic )
1070 1106 {
1071 1107 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
1072   - m_pendingOperations.erase(p.first);
  1108 + m_pendingOperations.erase( p.first );
1073 1109 }
1074 1110 // Clear the administration used in the subscribe process.
1075 1111 m_subscribeTokenToTopic.clear();
... ... @@ -1077,39 +1113,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string&amp; cause)
1077 1113 }
1078 1114  
1079 1115 // static
1080   -void ClientPaho::onConnect(void* context, char* cause)
  1116 +void ClientPaho::onFirstConnect( void* context, char* cause )
1081 1117 {
1082   - if (context)
  1118 + LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
  1119 + if( context )
1083 1120 {
1084   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1085   - std::string reason(nullptr == cause ? "unknown cause" : cause);
1086   - cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); });
  1121 + auto *cl = reinterpret_cast<ClientPaho*>( context );
  1122 + std::string reason( nullptr == cause ? "Unknown cause" : cause );
  1123 + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
  1124 + }
  1125 +}
  1126 +
  1127 +void ClientPaho::onConnect( void* context, char* cause )
  1128 +{
  1129 + LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
  1130 + if( context )
  1131 + {
  1132 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1133 + std::string reason( nullptr == cause ? "unknown cause" : cause );
  1134 + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
1087 1135 }
1088 1136 }
1089 1137  
1090 1138 // static
1091   -void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response)
  1139 +void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
1092 1140 {
1093   - if (context)
  1141 + LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
  1142 + if( context )
1094 1143 {
1095   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1096   - if (!response) {
  1144 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1145 + if( !response )
  1146 + {
1097 1147 // connect should always have a valid response struct.
1098   - // ("ClientPaho", "onConnectSuccess - no response data");
  1148 + LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
  1149 + return;
1099 1150 }
1100   - MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
1101   - cl->pushIncomingEvent([cl, resp]() { cl->onConnectSuccessOnInstance(resp); });
  1151 + // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
  1152 + cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
1102 1153 }
1103 1154 }
1104 1155  
1105 1156 // static
1106   -void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response)
  1157 +void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
1107 1158 {
1108   - if (context)
  1159 + LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
  1160 + if( context )
1109 1161 {
1110   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1111   - MqttFailure resp(response);
1112   - cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); });
  1162 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1163 + MqttFailure resp( response );
  1164 + cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
1113 1165 }
1114 1166 }
1115 1167  
... ... @@ -1134,34 +1186,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response
1134 1186 //}
1135 1187  
1136 1188 // static
1137   -void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response)
  1189 +void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
1138 1190 {
1139   - if (context)
  1191 + if( context )
1140 1192 {
1141   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1142   - MqttSuccess resp(response ? response->token : 0);
1143   - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); });
  1193 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1194 + MqttSuccess resp( response ? response->token : 0 );
  1195 + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
1144 1196 }
1145 1197 }
1146 1198  
1147 1199 // static
1148   -void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  1200 +void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
1149 1201 {
1150   - if (context)
  1202 + LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
  1203 + if( context )
1151 1204 {
1152   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1153   - MqttFailure resp(response);
1154   - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); });
  1205 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1206 + MqttFailure resp( response );
  1207 + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
1155 1208 }
1156 1209 }
1157 1210  
1158 1211 // static
1159   -void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response)
  1212 +void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
1160 1213 {
1161   - if (context)
  1214 + if( context )
1162 1215 {
1163   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1164   - if (!response)
  1216 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1217 + if( !response )
1165 1218 {
1166 1219 // publish should always have a valid response struct.
1167 1220 // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
... ... @@ -1172,134 +1225,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response
1172 1225 }
1173 1226  
1174 1227 // static
1175   -void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response)
  1228 +void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
1176 1229 {
1177   - (void)response;
1178   - if (context)
  1230 + (void) response;
  1231 + if( context )
1179 1232 {
1180   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1181   - MqttFailure resp(response);
1182   - cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); });
  1233 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1234 + MqttFailure resp( response );
  1235 + cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
1183 1236 }
1184 1237 }
1185 1238  
1186 1239 // static
1187   -void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response)
  1240 +void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
1188 1241 {
1189   - if (context)
  1242 + if( context )
1190 1243 {
1191   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1192   - if (!response)
  1244 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1245 + if( !response )
1193 1246 {
1194 1247 // subscribe should always have a valid response struct.
1195 1248 // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
1196 1249 }
1197   - MqttSuccess resp(response->token, response->alt.qos);
1198   - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); });
  1250 + MqttSuccess resp( response->token, response->alt.qos );
  1251 + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
1199 1252 }
1200 1253 }
1201 1254  
1202 1255 // static
1203   -void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  1256 +void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
1204 1257 {
1205   - if (context)
  1258 + if( context )
1206 1259 {
1207   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1208   - MqttFailure resp(response);
1209   - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); });
  1260 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1261 + MqttFailure resp( response );
  1262 + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
1210 1263 }
1211 1264 }
1212 1265  
1213 1266 // static
1214   -void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response)
  1267 +void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
1215 1268 {
1216   - if (context)
  1269 + if( context )
1217 1270 {
1218   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1219   - MqttSuccess resp(response ? response->token : -1);
1220   - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); });
  1271 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1272 + MqttSuccess resp( response ? response->token : -1 );
  1273 + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
1221 1274 }
1222 1275 }
1223 1276  
1224 1277 // static
1225   -void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response)
  1278 +void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
1226 1279 {
1227   - if (context)
  1280 + if( context )
1228 1281 {
1229   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1230   - MqttFailure resp(response);
1231   - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); });
  1282 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1283 + MqttFailure resp( response );
  1284 + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
1232 1285 }
1233 1286 }
1234 1287  
1235 1288 // static
1236   -int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message)
  1289 +int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
1237 1290 {
1238 1291  
1239   - OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]()
  1292 + OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
1240 1293 {
1241   - MQTTAsync_freeMessage(&message);
1242   - MQTTAsync_free(topicName);
  1294 + MQTTAsync_freeMessage( &message );
  1295 + MQTTAsync_free( topicName );
1243 1296 });
1244 1297  
1245   - if (context)
  1298 + if( context )
1246 1299 {
1247   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1248   - MqttMessage msg(topicName, *message);
1249   - cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); });
  1300 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1301 + MqttMessage msg( topicName, *message );
  1302 + cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
1250 1303 }
1251 1304  
1252 1305 return 1; // always return true. Otherwise this callback is triggered again.
1253 1306 }
1254 1307  
1255 1308 // static
1256   -void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token)
  1309 +void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
1257 1310 {
1258   - if (context)
  1311 + if( context )
1259 1312 {
1260   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1261   - cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); });
  1313 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1314 + cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
1262 1315 }
1263 1316 }
1264 1317  
1265 1318 // static
1266   -void ClientPaho::onConnectionLost(void* context, char* cause)
  1319 +void ClientPaho::onConnectionLost( void* context, char* cause )
1267 1320 {
1268   - OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]()
  1321 + OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
1269 1322 {
1270   - if (cause)
  1323 + if( cause )
1271 1324 {
1272   - MQTTAsync_free(cause);
  1325 + MQTTAsync_free( cause );
1273 1326 }
1274 1327 });
1275 1328  
1276   - if (context)
  1329 + if( context )
1277 1330 {
1278   - auto* cl = reinterpret_cast<ClientPaho*>(context);
1279   - std::string msg(nullptr == cause ? "cause unknown" : cause);
1280   - cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); });
  1331 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1332 + std::string msg( nullptr == cause ? "cause unknown" : cause );
  1333 + cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
1281 1334 }
1282 1335 }
1283 1336  
1284 1337 // static
1285   -void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  1338 +void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
1286 1339 {
1287   - (void)message;
1288   - switch (level)
  1340 + (void) message;
  1341 + switch( level )
1289 1342 {
1290 1343 case MQTTASYNC_TRACE_MAXIMUM:
1291 1344 case MQTTASYNC_TRACE_MEDIUM:
1292   - case MQTTASYNC_TRACE_MINIMUM: {
  1345 + case MQTTASYNC_TRACE_MINIMUM:
  1346 + {
1293 1347 // ("ClientPaho", "paho - %1", message)
1294 1348 break;
1295 1349 }
1296   - case MQTTASYNC_TRACE_PROTOCOL: {
  1350 + case MQTTASYNC_TRACE_PROTOCOL:
  1351 + {
1297 1352 // ("ClientPaho", "paho - %1", message)
1298 1353 break;
1299 1354 }
1300 1355 case MQTTASYNC_TRACE_ERROR:
1301 1356 case MQTTASYNC_TRACE_SEVERE:
1302   - case MQTTASYNC_TRACE_FATAL: {
  1357 + case MQTTASYNC_TRACE_FATAL:
  1358 + {
1303 1359 // ("ClientPaho", "paho - %1", message)
1304 1360 break;
1305 1361 }
... ...
src/connectionstatus.cpp
... ... @@ -38,5 +38,5 @@ std::ostream&amp; operator&lt;&lt;(std::ostream &amp;os, ConnectionStatus rhs)
38 38 case ConnectionStatus::Connected:
39 39 return os << "Connected";
40 40 }
41   - return os << "Unknown"; // Should never be reached.
  41 + return os << "Unknown"; // "Should" never be reached. ( But guess what? )
42 42 }
... ...
src/log.cpp
... ... @@ -53,7 +53,7 @@ int toInt( LogLevel level )
53 53  
54 54 std::string Log::s_context = std::string();
55 55 std::string Log::s_fileName = std::string();
56   -LogLevel Log::s_logLevel = LogLevel::Error;
  56 +LogLevel Log::s_logLevel = LogLevel::Debug;
57 57 LogMask Log::s_logMask = LogMask::None;
58 58  
59 59 void Log::init( const std::string& context, const std::string& logFile, LogLevel logDepth )
... ...
src/mqttclient.cpp
... ... @@ -125,7 +125,7 @@ StateEnum MqttClient::state() const
125 125 return m_serverState.state();
126 126 }
127 127  
128   -void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt )
  128 +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt, bool blocking )
129 129 {
130 130 osdev::components::mqtt::ParsedUri _endpoint = {
131 131 { "scheme", "tcp" },
... ... @@ -135,10 +135,10 @@ void MqttClient::connect(const std::string&amp; host, int port, const Credentials &amp;c
135 135 { "port", std::to_string(port) }
136 136 };
137 137  
138   - this->connect( UriParser::toString( _endpoint ), lwt );
  138 + this->connect( UriParser::toString( _endpoint ), lwt, blocking );
139 139 }
140 140  
141   -void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt )
  141 +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt, bool blocking )
142 142 {
143 143 LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) );
144 144  
... ... @@ -171,7 +171,7 @@ void MqttClient::connect( const std::string &amp;_endpoint, const mqtt_LWT &amp;lwt )
171 171 client = m_principalClient.get();
172 172 }
173 173  
174   - client->connect( true, lwt );
  174 + client->connect( blocking, lwt );
175 175 }
176 176  
177 177 void MqttClient::disconnect()
... ... @@ -254,12 +254,15 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
254 254 // throw (?)(MqttException, "Not connected");
255 255 return Token(m_clientId, -1);
256 256 }
257   - if (!m_principalClient->isOverlapping(topic)) {
  257 + if (!m_principalClient->isOverlapping(topic))
  258 + {
258 259 client = m_principalClient.get();
259 260 clientFound = true;
260 261 }
261   - else {
262   - for (const auto& c : m_additionalClients) {
  262 + else
  263 + {
  264 + for (const auto& c : m_additionalClients)
  265 + {
263 266 if (!c->isOverlapping(topic)) {
264 267 client = c.get();
265 268 clientFound = true;
... ... @@ -267,7 +270,8 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
267 270 }
268 271 }
269 272 }
270   - if (!clientFound) {
  273 + if (!clientFound)
  274 + {
271 275 LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) );
272 276 std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1.
273 277 m_additionalClients.emplace_back(std::make_unique<ClientPaho>(
... ... @@ -278,7 +282,8 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
278 282 client = m_additionalClients.back().get();
279 283 }
280 284 }
281   - if (!clientFound) {
  285 + if (!clientFound)
  286 + {
282 287 client->connect(true);
283 288 }
284 289 return Token{ client->clientId(), client->subscribe(topic, qos, cb) };
... ...