Commit ca0cf29e2d7df54d53b1ee7c557cabb67a7beb4e

Authored by Steven
1 parent 0c424e03

syntax fixes, converted logs to correct Log format.

Showing 1 changed file with 260 additions and 255 deletions
src/clientpaho.cpp
@@ -94,10 +94,10 @@ struct Init @@ -94,10 +94,10 @@ struct Init
94 94
95 std::atomic_int ClientPaho::s_numberOfInstances(0); 95 std::atomic_int ClientPaho::s_numberOfInstances(0);
96 96
97 -ClientPaho::ClientPaho(const std::string& _endpoint, 97 +ClientPaho::ClientPaho( const std::string& _endpoint,
98 const std::string& _id, 98 const std::string& _id,
99 - const std::function<void(const std::string&, ConnectionStatus)>& connectionStatusCallback,  
100 - const std::function<void(const std::string& clientId, std::int32_t pubMsgToken)>& deliveryCompleteCallback) 99 + const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
  100 + const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
101 : m_mutex() 101 : m_mutex()
102 , m_endpoint() 102 , m_endpoint()
103 , m_username() 103 , m_username()
@@ -123,19 +123,19 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint, @@ -123,19 +123,19 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint,
123 , m_callbackEventQueue(m_clientId) 123 , m_callbackEventQueue(m_clientId)
124 , m_workerThread() 124 , m_workerThread()
125 { 125 {
126 - if (0 == s_numberOfInstances++) 126 + if( 0 == s_numberOfInstances++ )
127 { 127 {
128 - MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho); 128 + MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
129 } 129 }
130 130
131 - LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho ") ); 131 + LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
132 parseEndpoint(_endpoint); 132 parseEndpoint(_endpoint);
133 133
134 - auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);  
135 - if (MQTTASYNC_SUCCESS == rc) 134 + auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
  135 + if( MQTTASYNC_SUCCESS == rc )
136 { 136 {
137 - MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete);  
138 - m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); 137 + MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
  138 + m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
139 } 139 }
140 else 140 else
141 { 141 {
@@ -145,33 +145,33 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint, @@ -145,33 +145,33 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint,
145 145
146 ClientPaho::~ClientPaho() 146 ClientPaho::~ClientPaho()
147 { 147 {
148 - LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - dtor ClientPao" ) ); 148 + LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
149 if( MQTTAsync_isConnected( m_client ) ) 149 if( MQTTAsync_isConnected( m_client ) )
150 { 150 {
151 this->unsubscribeAll(); 151 this->unsubscribeAll();
152 152
153 - this->waitForCompletion(std::chrono::milliseconds(2000), std::set<int32_t>{});  
154 - this->disconnect(true, 5000); 153 + this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
  154 + this->disconnect( true, 5000 );
155 } 155 }
156 else 156 else
157 { 157 {
158 // If the status was already disconnected this call does nothing 158 // If the status was already disconnected this call does nothing
159 - setConnectionStatus(ConnectionStatus::Disconnected); 159 + setConnectionStatus( ConnectionStatus::Disconnected );
160 } 160 }
161 161
162 - if (0 == --s_numberOfInstances) 162 + if( 0 == --s_numberOfInstances )
163 { 163 {
164 // encountered a case where termination of the logging system within paho led to a segfault. 164 // encountered a case where termination of the logging system within paho led to a segfault.
165 // This was a paho thread that was cleaned while at the same time the logging system was terminated. 165 // This was a paho thread that was cleaned while at the same time the logging system was terminated.
166 // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less 166 // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
167 // frequently. 167 // frequently.
168 - MQTTAsync_setTraceCallback(nullptr); 168 + MQTTAsync_setTraceCallback( nullptr );
169 } 169 }
170 170
171 - MQTTAsync_destroy(&m_client); 171 + MQTTAsync_destroy( &m_client );
172 172
173 m_callbackEventQueue.stop(); 173 m_callbackEventQueue.stop();
174 - if (m_workerThread.joinable()) 174 + if( m_workerThread.joinable() )
175 { 175 {
176 m_workerThread.join(); 176 m_workerThread.join();
177 } 177 }
@@ -212,11 +212,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -212,11 +212,11 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
212 auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect ); 212 auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
213 if( MQTTASYNC_SUCCESS == ccb ) 213 if( MQTTASYNC_SUCCESS == ccb )
214 { 214 {
215 - LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); 215 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
216 } 216 }
217 else 217 else
218 { 218 {
219 - LogDebug( "[ClientPaho]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); 219 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
220 } 220 }
221 221
222 // Setup the last will and testament, if so desired. 222 // Setup the last will and testament, if so desired.
@@ -235,7 +235,6 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -235,7 +235,6 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
235 conn_opts.will = nullptr; 235 conn_opts.will = nullptr;
236 } 236 }
237 237
238 -  
239 if( !m_username.empty() ) 238 if( !m_username.empty() )
240 { 239 {
241 conn_opts.username = m_username.c_str(); 240 conn_opts.username = m_username.c_str();
@@ -311,14 +310,14 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) @@ -311,14 +310,14 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
311 310
312 { 311 {
313 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 312 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
314 - if (!m_pendingOperations.insert(-200).second) 313 + if( !m_pendingOperations.insert( -200 ).second )
315 { 314 {
316 - //"ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) 315 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
317 } 316 }
318 - m_operationResult.erase(-200); 317 + m_operationResult.erase( -200 );
319 } 318 }
320 319
321 - int rc = MQTTAsync_disconnect(m_client, &disconn_opts); 320 + int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
322 if( MQTTASYNC_SUCCESS != rc ) 321 if( MQTTASYNC_SUCCESS != rc )
323 { 322 {
324 if( MQTTASYNC_DISCONNECTED == rc ) 323 if( MQTTASYNC_DISCONNECTED == rc )
@@ -329,21 +328,20 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) @@ -329,21 +328,20 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
329 setConnectionStatus( currentStatus ); 328 setConnectionStatus( currentStatus );
330 OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); 329 OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
331 m_operationResult[-200] = false; 330 m_operationResult[-200] = false;
332 - m_pendingOperations.erase(-200); 331 + m_pendingOperations.erase( -200 );
333 332
334 if( MQTTASYNC_DISCONNECTED == rc ) 333 if( MQTTASYNC_DISCONNECTED == rc )
335 { 334 {
336 return -1; 335 return -1;
337 } 336 }
338 - // ("ClientPaho", std::string( "%1 - failed to disconnect, return code %2" ).arg( m_clientId ).arg( pahoAsyncErrorCodeToString(rc)) ); 337 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
339 } 338 }
340 339
341 if( wait ) 340 if( wait )
342 { 341 {
343 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) 342 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
344 { 343 {
345 - // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId);  
346 - 344 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
347 } 345 }
348 waitForDisconnect.get(); 346 waitForDisconnect.get();
349 m_disconnectPromise.reset(); 347 m_disconnectPromise.reset();
@@ -351,22 +349,22 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) @@ -351,22 +349,22 @@ std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
351 return -200; 349 return -200;
352 } 350 }
353 351
354 -std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) 352 +std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
355 { 353 {
356 if( ConnectionStatus::DisconnectInProgress == m_connectionStatus ) 354 if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
357 { 355 {
358 - // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); 356 + LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
359 return -1; 357 return -1;
360 } 358 }
361 else if( ConnectionStatus::Disconnected == m_connectionStatus ) 359 else if( ConnectionStatus::Disconnected == m_connectionStatus )
362 { 360 {
363 - // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); 361 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
364 connect( true ); 362 connect( true );
365 } 363 }
366 364
367 if( !isValidTopic(message.topic() ) ) 365 if( !isValidTopic(message.topic() ) )
368 { 366 {
369 - // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); 367 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
370 } 368 }
371 369
372 if( qos > 2 ) 370 if( qos > 2 )
@@ -379,13 +377,13 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos) @@ -379,13 +377,13 @@ std::int32_t ClientPaho::publish(const MqttMessage&amp; message, int qos)
379 } 377 }
380 378
381 std::unique_lock<std::mutex> lck(m_mutex); 379 std::unique_lock<std::mutex> lck(m_mutex);
382 - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) 380 + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes )
383 { 381 {
384 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); 382 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
385 - if(ConnectionStatus::ReconnectInProgress == m_connectionStatus) 383 + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus )
386 { 384 {
387 LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." ); 385 LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
388 - m_pendingPublishes.push_front(Publish{ qos, message }); 386 + m_pendingPublishes.push_front( Publish{ qos, message } );
389 return -1; 387 return -1;
390 } 388 }
391 } 389 }
@@ -411,8 +409,7 @@ void ClientPaho::publishPending() @@ -411,8 +409,7 @@ void ClientPaho::publishPending()
411 while( !m_pendingPublishes.empty() ) 409 while( !m_pendingPublishes.empty() )
412 { 410 {
413 const auto& pub = m_pendingPublishes.back(); 411 const auto& pub = m_pendingPublishes.back();
414 - publishInternal(pub.data, pub.qos);  
415 - // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what()); 412 + publishInternal( pub.data, pub.qos );
416 413
417 m_pendingPublishes.pop_back(); 414 m_pendingPublishes.pop_back();
418 } 415 }
@@ -482,18 +479,18 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std @@ -482,18 +479,18 @@ std::int32_t ClientPaho::subscribe( const std::string&amp; topic, int qos, const std
482 // (OverlappingTopicException, "overlapping topic", existingTopic, topic); 479 // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
483 } 480 }
484 481
485 - // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic);  
486 - m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb } ) ); 482 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
  483 + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
487 } 484 }
488 return subscribeInternal( topic, qos ); 485 return subscribeInternal( topic, qos );
489 } 486 }
490 487
491 void ClientPaho::resubscribe() 488 void ClientPaho::resubscribe()
492 { 489 {
493 - decltype(m_pendingSubscriptions) pendingSubscriptions{}; 490 + decltype( m_pendingSubscriptions ) pendingSubscriptions{};
494 { 491 {
495 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 492 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
496 - std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); 493 + std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
497 } 494 }
498 495
499 for( const auto& s : pendingSubscriptions ) 496 for( const auto& s : pendingSubscriptions )
@@ -530,21 +527,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos ) @@ -530,21 +527,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
530 // Need to lock the mutex because it is possible that the callback is faster than 527 // Need to lock the mutex because it is possible that the callback is faster than
531 // the insertion of the token into the pending operations. 528 // the insertion of the token into the pending operations.
532 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 529 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
533 - auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts); 530 + auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
534 if( MQTTASYNC_SUCCESS != rc ) 531 if( MQTTASYNC_SUCCESS != rc )
535 { 532 {
536 - // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); 533 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
537 } 534 }
538 535
539 if( !m_pendingOperations.insert( opts.token ).second ) 536 if( !m_pendingOperations.insert( opts.token ).second )
540 { 537 {
541 - // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); 538 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) );
542 } 539 }
543 540
544 m_operationResult.erase( opts.token ); 541 m_operationResult.erase( opts.token );
545 if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 ) 542 if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
546 { 543 {
547 - // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); 544 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
548 } 545 }
549 m_lastUnsubscribe = opts.token; // centos7 workaround 546 m_lastUnsubscribe = opts.token; // centos7 workaround
550 m_unsubscribeTokenToTopic[opts.token] = topic; 547 m_unsubscribeTokenToTopic[opts.token] = topic;
@@ -566,32 +563,34 @@ void ClientPaho::unsubscribeAll() @@ -566,32 +563,34 @@ void ClientPaho::unsubscribeAll()
566 563
567 for( const auto& s : subscriptions ) 564 for( const auto& s : subscriptions )
568 { 565 {
569 - this->unsubscribe(s.first, s.second.qos); 566 + this->unsubscribe( s.first, s.second.qos );
570 } 567 }
571 } 568 }
572 569
573 std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const 570 std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
574 { 571 {
575 - if (waitFor <= std::chrono::milliseconds(0)) {  
576 - return std::chrono::milliseconds(0); 572 + if( waitFor <= std::chrono::milliseconds( 0 ) )
  573 + {
  574 + return std::chrono::milliseconds( 0 );
577 } 575 }
578 std::chrono::milliseconds timeElapsed{}; 576 std::chrono::milliseconds timeElapsed{};
579 { 577 {
580 - osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds) 578 + osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
581 { 579 {
582 - timeElapsed = std::chrono::ceil<std::chrono::milliseconds>(sinceStart); 580 + timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
583 }); 581 });
584 std::unique_lock<std::mutex> lck(m_mutex); 582 std::unique_lock<std::mutex> lck(m_mutex);
  583 +
585 // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); 584 // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
586 m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() 585 m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
587 { 586 {
588 - if (tokens.empty()) 587 + if( tokens.empty() )
589 { // wait for all operations to end 588 { // wait for all operations to end
590 return m_pendingOperations.empty(); 589 return m_pendingOperations.empty();
591 } 590 }
592 - else if (tokens.size() == 1) 591 + else if( tokens.size() == 1 )
593 { 592 {
594 - return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end(); 593 + return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
595 } 594 }
596 std::vector<std::int32_t> intersect{}; 595 std::vector<std::int32_t> intersect{};
597 std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); 596 std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
@@ -601,16 +600,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond @@ -601,16 +600,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond
601 return timeElapsed; 600 return timeElapsed;
602 } 601 }
603 602
604 -bool ClientPaho::isOverlapping(const std::string& topic) const 603 +bool ClientPaho::isOverlapping( const std::string& topic ) const
605 { 604 {
606 std::string existingTopic{}; 605 std::string existingTopic{};
607 - return isOverlapping(topic, existingTopic); 606 + return isOverlapping( topic, existingTopic );
608 } 607 }
609 608
610 -bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const 609 +bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
611 { 610 {
612 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 611 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
613 - return isOverlappingInternal(topic, existingTopic); 612 + return isOverlappingInternal( topic, existingTopic );
614 } 613 }
615 614
616 std::vector<std::int32_t> ClientPaho::pendingOperations() const 615 std::vector<std::int32_t> ClientPaho::pendingOperations() const
@@ -618,7 +617,7 @@ std::vector&lt;std::int32_t&gt; ClientPaho::pendingOperations() const @@ -618,7 +617,7 @@ std::vector&lt;std::int32_t&gt; ClientPaho::pendingOperations() const
618 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 617 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
619 std::vector<std::int32_t> retval{}; 618 std::vector<std::int32_t> retval{};
620 retval.resize(m_pendingOperations.size()); 619 retval.resize(m_pendingOperations.size());
621 - std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin()); 620 + std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
622 return retval; 621 return retval;
623 } 622 }
624 623
@@ -628,36 +627,36 @@ bool ClientPaho::hasPendingSubscriptions() const @@ -628,36 +627,36 @@ bool ClientPaho::hasPendingSubscriptions() const
628 return !m_pendingSubscriptions.empty(); 627 return !m_pendingSubscriptions.empty();
629 } 628 }
630 629
631 -boost::optional<bool> ClientPaho::operationResult(std::int32_t token) const 630 +boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
632 { 631 {
633 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 632 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
634 boost::optional<bool> ret{}; 633 boost::optional<bool> ret{};
635 - auto cit = m_operationResult.find(token);  
636 - if (m_operationResult.end() != cit) 634 + auto cit = m_operationResult.find( token );
  635 + if( m_operationResult.end() != cit )
637 { 636 {
638 ret = cit->second; 637 ret = cit->second;
639 } 638 }
640 return ret; 639 return ret;
641 } 640 }
642 641
643 -void ClientPaho::parseEndpoint(const std::string& _endpoint) 642 +void ClientPaho::parseEndpoint( const std::string& _endpoint )
644 { 643 {
645 - auto ep = UriParser::parse(_endpoint);  
646 - if (ep.find("user") != ep.end()) 644 + auto ep = UriParser::parse( _endpoint );
  645 + if( ep.find( "user" ) != ep.end() )
647 { 646 {
648 m_username = ep["user"]; 647 m_username = ep["user"];
649 ep["user"].clear(); 648 ep["user"].clear();
650 } 649 }
651 650
652 - if (ep.find("password") != ep.end()) 651 + if( ep.find( "password" ) != ep.end() )
653 { 652 {
654 m_password = ep["password"]; 653 m_password = ep["password"];
655 ep["password"].clear(); 654 ep["password"].clear();
656 } 655 }
657 - m_endpoint = UriParser::toString(ep); 656 + m_endpoint = UriParser::toString( ep );
658 } 657 }
659 658
660 -std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) 659 +std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
661 { 660 {
662 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; 661 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
663 opts.onSuccess = &ClientPaho::onPublishSuccess; 662 opts.onSuccess = &ClientPaho::onPublishSuccess;
@@ -671,20 +670,20 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage&amp; message, int qos) @@ -671,20 +670,20 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage&amp; message, int qos)
671 670
672 // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 671 // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
673 auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts); 672 auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts);
674 - if (MQTTASYNC_SUCCESS != rc) 673 + if( MQTTASYNC_SUCCESS != rc )
675 { 674 {
676 - // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc)); 675 + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
677 } 676 }
678 677
679 - if (!m_pendingOperations.insert(opts.token).second) 678 + if( !m_pendingOperations.insert( opts.token ).second )
680 { 679 {
681 - // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token); 680 + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
682 } 681 }
683 - m_operationResult.erase(opts.token); 682 + m_operationResult.erase( opts.token );
684 return opts.token; 683 return opts.token;
685 } 684 }
686 685
687 -std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) 686 +std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
688 { 687 {
689 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; 688 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
690 opts.onSuccess = &ClientPaho::onSubscribeSuccess; 689 opts.onSuccess = &ClientPaho::onSubscribeSuccess;
@@ -697,19 +696,18 @@ std::int32_t ClientPaho::subscribeInternal(const std::string&amp; topic, int qos) @@ -697,19 +696,18 @@ std::int32_t ClientPaho::subscribeInternal(const std::string&amp; topic, int qos)
697 auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts); 696 auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts);
698 if (MQTTASYNC_SUCCESS != rc) 697 if (MQTTASYNC_SUCCESS != rc)
699 { 698 {
700 - m_pendingSubscriptions.erase(topic);  
701 - // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc));  
702 - // (MqttException, "Subscription failed"); 699 + m_pendingSubscriptions.erase( topic );
  700 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
703 } 701 }
704 702
705 - if (!m_pendingOperations.insert(opts.token).second) 703 + if( !m_pendingOperations.insert( opts.token ).second )
706 { 704 {
707 - // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token); 705 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
708 } 706 }
709 - m_operationResult.erase(opts.token);  
710 - if (m_subscribeTokenToTopic.count(opts.token) > 0) 707 + m_operationResult.erase( opts.token );
  708 + if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
711 { 709 {
712 - // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic); 710 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
713 } 711 }
714 m_subscribeTokenToTopic[opts.token] = topic; 712 m_subscribeTokenToTopic[opts.token] = topic;
715 return opts.token; 713 return opts.token;
@@ -755,7 +753,7 @@ void ClientPaho::pushIncomingEvent(std::function&lt;void()&gt; ev) @@ -755,7 +753,7 @@ void ClientPaho::pushIncomingEvent(std::function&lt;void()&gt; ev)
755 753
756 void ClientPaho::callbackEventHandler() 754 void ClientPaho::callbackEventHandler()
757 { 755 {
758 - LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler") ); 756 + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
759 for( ;; ) 757 for( ;; )
760 { 758 {
761 std::vector<std::function<void()>> events; 759 std::vector<std::function<void()>> events;
@@ -773,7 +771,7 @@ void ClientPaho::callbackEventHandler() @@ -773,7 +771,7 @@ void ClientPaho::callbackEventHandler()
773 } 771 }
774 void ClientPaho::onConnectOnInstance( const std::string& cause ) 772 void ClientPaho::onConnectOnInstance( const std::string& cause )
775 { 773 {
776 - (void)cause; 774 + (void) cause;
777 { 775 {
778 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 776 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
779 std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); 777 std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end()));
@@ -809,7 +807,7 @@ void ClientPaho::onConnectSuccessOnInstance() @@ -809,7 +807,7 @@ void ClientPaho::onConnectSuccessOnInstance()
809 setConnectionStatus( ConnectionStatus::Connected ); 807 setConnectionStatus( ConnectionStatus::Connected );
810 if( m_connectPromise ) 808 if( m_connectPromise )
811 { 809 {
812 - LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!") ); 810 + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
813 m_connectPromise->set_value(); 811 m_connectPromise->set_value();
814 } 812 }
815 m_operationsCompleteCV.notify_all(); 813 m_operationsCompleteCV.notify_all();
@@ -825,9 +823,9 @@ void ClientPaho::onConnectFailureOnInstance( const MqttFailure&amp; response ) @@ -825,9 +823,9 @@ void ClientPaho::onConnectFailureOnInstance( const MqttFailure&amp; response )
825 m_operationResult[-100] = false; 823 m_operationResult[-100] = false;
826 m_pendingOperations.erase(-100); 824 m_pendingOperations.erase(-100);
827 } 825 }
828 - if (ConnectionStatus::ConnectInProgress == m_connectionStatus) 826 + if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
829 { 827 {
830 - setConnectionStatus(ConnectionStatus::Disconnected); 828 + setConnectionStatus( ConnectionStatus::Disconnected );
831 } 829 }
832 m_operationsCompleteCV.notify_all(); 830 m_operationsCompleteCV.notify_all();
833 } 831 }
@@ -837,9 +835,9 @@ void ClientPaho::onConnectFailureOnInstance( const MqttFailure&amp; response ) @@ -837,9 +835,9 @@ void ClientPaho::onConnectFailureOnInstance( const MqttFailure&amp; response )
837 // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); 835 // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
838 //} 836 //}
839 837
840 -void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) 838 +void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
841 { 839 {
842 - // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint); 840 + LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
843 { 841 {
844 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 842 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
845 m_subscriptions.clear(); 843 m_subscriptions.clear();
@@ -864,9 +862,9 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&amp;) @@ -864,9 +862,9 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&amp;)
864 void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) 862 void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
865 { 863 {
866 (void) response; 864 (void) response;
867 - // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); 865 + LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
868 { 866 {
869 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 867 + OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
870 // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); 868 // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
871 m_operationResult[-200] = false; 869 m_operationResult[-200] = false;
872 m_pendingOperations.erase(-200); 870 m_pendingOperations.erase(-200);
@@ -891,7 +889,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure&amp; response ) @@ -891,7 +889,7 @@ void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure&amp; response )
891 void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) 889 void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
892 { 890 {
893 auto pd = response.publishData(); 891 auto pd = response.publishData();
894 - // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); 892 + LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
895 { 893 {
896 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 894 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
897 // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 895 // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
@@ -903,7 +901,7 @@ void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess&amp; response ) @@ -903,7 +901,7 @@ void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess&amp; response )
903 901
904 void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) 902 void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
905 { 903 {
906 - // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); 904 + LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
907 { 905 {
908 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 906 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
909 // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 907 // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
@@ -915,77 +913,80 @@ void ClientPaho::onPublishFailureOnInstance( const MqttFailure&amp; response ) @@ -915,77 +913,80 @@ void ClientPaho::onPublishFailureOnInstance( const MqttFailure&amp; response )
915 913
916 void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response ) 914 void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
917 { 915 {
918 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token());  
919 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 916 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  917 +
  918 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
920 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 919 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
921 bool operationOk = false; 920 bool operationOk = false;
922 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]() 921 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
923 { 922 {
924 // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 923 // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
925 m_operationResult[response.token()] = operationOk; 924 m_operationResult[response.token()] = operationOk;
926 - m_pendingOperations.erase(response.token()); 925 + m_pendingOperations.erase( response.token() );
927 }); 926 });
928 - auto it = m_subscribeTokenToTopic.find(response.token());  
929 - if (m_subscribeTokenToTopic.end() == it) 927 + auto it = m_subscribeTokenToTopic.find( response.token() );
  928 + if( m_subscribeTokenToTopic.end() == it )
930 { 929 {
931 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); 930 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
932 return; 931 return;
933 } 932 }
934 auto topic = it->second; 933 auto topic = it->second;
935 m_subscribeTokenToTopic.erase(it); 934 m_subscribeTokenToTopic.erase(it);
936 935
937 - auto pendingIt = m_pendingSubscriptions.find(topic);  
938 - if (m_pendingSubscriptions.end() == pendingIt) 936 + auto pendingIt = m_pendingSubscriptions.find( topic );
  937 + if( m_pendingSubscriptions.end() == pendingIt )
939 { 938 {
940 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); 939 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
941 return; 940 return;
942 } 941 }
943 - if (response.qos() != pendingIt->second.qos) 942 + if( response.qos() != pendingIt->second.qos )
944 { 943 {
945 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos()); 944 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
946 } 945 }
947 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic);  
948 - m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second)));  
949 - m_pendingSubscriptions.erase(pendingIt); 946 +
  947 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
  948 + m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
  949 + m_pendingSubscriptions.erase( pendingIt );
950 operationOk = true; 950 operationOk = true;
951 } 951 }
952 952
953 -void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response) 953 +void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
954 { 954 {
955 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());  
956 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 955 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  956 +
  957 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
957 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 958 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
958 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() 959 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
959 { 960 {
960 // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 961 // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
961 m_operationResult[response.token()] = false; 962 m_operationResult[response.token()] = false;
962 - m_pendingOperations.erase(response.token()); 963 + m_pendingOperations.erase( response.token() );
963 }); 964 });
964 965
965 - auto it = m_subscribeTokenToTopic.find(response.token());  
966 - if (m_subscribeTokenToTopic.end() == it) 966 + auto it = m_subscribeTokenToTopic.find( response.token() );
  967 + if( m_subscribeTokenToTopic.end() == it )
967 { 968 {
968 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); 969 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
969 return; 970 return;
970 } 971 }
971 auto topic = it->second; 972 auto topic = it->second;
972 - m_subscribeTokenToTopic.erase(it); 973 + m_subscribeTokenToTopic.erase( it );
973 974
974 - auto pendingIt = m_pendingSubscriptions.find(topic);  
975 - if (m_pendingSubscriptions.end() == pendingIt) 975 + auto pendingIt = m_pendingSubscriptions.find( topic );
  976 + if( m_pendingSubscriptions.end() == pendingIt )
976 { 977 {
977 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); 978 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
978 return; 979 return;
979 } 980 }
980 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic);  
981 - m_pendingSubscriptions.erase(pendingIt); 981 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
  982 + m_pendingSubscriptions.erase( pendingIt );
982 } 983 }
983 984
984 -void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) 985 +void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
985 { 986 {
986 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token()); 987 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
987 988
988 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 989 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
989 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 990 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
990 991
991 // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. 992 // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
@@ -994,116 +995,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess&amp; response) @@ -994,116 +995,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess&amp; response)
994 // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled 995 // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
995 // sequentially (see ClientPaho::unsubscribe)! 996 // sequentially (see ClientPaho::unsubscribe)!
996 auto token = response.token(); 997 auto token = response.token();
997 - if (-1 == token) 998 + if( -1 == token )
998 { 999 {
999 token = m_lastUnsubscribe; 1000 token = m_lastUnsubscribe;
1000 m_lastUnsubscribe = -1; 1001 m_lastUnsubscribe = -1;
1001 } 1002 }
1002 1003
1003 bool operationOk = false; 1004 bool operationOk = false;
1004 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]() 1005 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
1005 { 1006 {
1006 // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); 1007 // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
1007 m_operationResult[token] = operationOk; 1008 m_operationResult[token] = operationOk;
1008 - m_pendingOperations.erase(token); 1009 + m_pendingOperations.erase( token );
1009 }); 1010 });
1010 1011
1011 - auto it = m_unsubscribeTokenToTopic.find(token);  
1012 - if (m_unsubscribeTokenToTopic.end() == it) 1012 + auto it = m_unsubscribeTokenToTopic.find( token );
  1013 + if( m_unsubscribeTokenToTopic.end() == it )
1013 { 1014 {
1014 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token); 1015 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
1015 return; 1016 return;
1016 } 1017 }
1017 auto topic = it->second; 1018 auto topic = it->second;
1018 - m_unsubscribeTokenToTopic.erase(it); 1019 + m_unsubscribeTokenToTopic.erase( it );
1019 1020
1020 - auto registeredIt = m_subscriptions.find(topic);  
1021 - if (m_subscriptions.end() == registeredIt) {  
1022 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token()); 1021 + auto registeredIt = m_subscriptions.find( topic );
  1022 + if( m_subscriptions.end() == registeredIt )
  1023 + {
  1024 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
1023 return; 1025 return;
1024 } 1026 }
1025 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic);  
1026 - m_subscriptions.erase(registeredIt); 1027 +
  1028 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
  1029 + m_subscriptions.erase( registeredIt );
1027 operationOk = true; 1030 operationOk = true;
1028 } 1031 }
1029 1032
1030 -void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response) 1033 +void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
1031 { 1034 {
1032 - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());  
1033 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 1035 + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  1036 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
1034 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 1037 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1035 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() 1038 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
1036 { 1039 {
1037 // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 1040 // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
1038 m_operationResult[response.token()] = false; 1041 m_operationResult[response.token()] = false;
1039 - m_pendingOperations.erase(response.token()); 1042 + m_pendingOperations.erase( response.token() );
1040 }); 1043 });
1041 1044
1042 - auto it = m_unsubscribeTokenToTopic.find(response.token());  
1043 - if (m_unsubscribeTokenToTopic.end() == it) 1045 + auto it = m_unsubscribeTokenToTopic.find( response.token() );
  1046 + if( m_unsubscribeTokenToTopic.end() == it )
1044 { 1047 {
1045 - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); 1048 + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
1046 return; 1049 return;
1047 } 1050 }
1048 auto topic = it->second; 1051 auto topic = it->second;
1049 - m_unsubscribeTokenToTopic.erase(it); 1052 + m_unsubscribeTokenToTopic.erase( it );
1050 } 1053 }
1051 1054
1052 -int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message) 1055 +int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
1053 { 1056 {
1054 - // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate());  
1055 -  
1056 - std::function<void(MqttMessage)> cb; 1057 + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
1057 1058
  1059 + std::function<void( MqttMessage )> cb;
1058 { 1060 {
1059 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 1061 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1060 - for (const auto& s : m_subscriptions) 1062 + for( const auto& s : m_subscriptions )
1061 { 1063 {
1062 - if (boost::regex_match(message.topic(), s.second.topicRegex)) 1064 + if( boost::regex_match( message.topic(), s.second.topicRegex ) )
1063 { 1065 {
1064 cb = s.second.callback; 1066 cb = s.second.callback;
1065 } 1067 }
1066 } 1068 }
1067 } 1069 }
1068 1070
1069 - if (cb) 1071 + if( cb )
1070 { 1072 {
1071 - cb(message); 1073 + cb( message );
1072 } 1074 }
1073 else 1075 else
1074 { 1076 {
1075 - // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic()); 1077 + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
1076 } 1078 }
1077 return 1; 1079 return 1;
1078 } 1080 }
1079 1081
1080 -void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token) 1082 +void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
1081 { 1083 {
1082 - // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token);  
1083 - if (m_deliveryCompleteCallback) 1084 + LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
  1085 + if( m_deliveryCompleteCallback )
1084 { 1086 {
1085 - m_deliveryCompleteCallback(m_clientId, static_cast<std::int32_t>(token)); 1087 + m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
1086 } 1088 }
1087 } 1089 }
1088 1090
1089 -void ClientPaho::onConnectionLostOnInstance(const std::string& cause) 1091 +void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
1090 { 1092 {
1091 - (void)cause; 1093 + (void) cause;
1092 // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); 1094 // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
1093 - setConnectionStatus(ConnectionStatus::ReconnectInProgress); 1095 + setConnectionStatus( ConnectionStatus::ReconnectInProgress );
1094 1096
1095 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 1097 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1096 // Remove all tokens related to subscriptions from the active operations. 1098 // Remove all tokens related to subscriptions from the active operations.
1097 - for (const auto& p : m_subscribeTokenToTopic) 1099 + for( const auto& p : m_subscribeTokenToTopic )
1098 { 1100 {
1099 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); 1101 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
1100 - m_pendingOperations.erase(p.first); 1102 + m_pendingOperations.erase( p.first );
1101 } 1103 }
1102 1104
1103 - for (const auto& p : m_unsubscribeTokenToTopic) 1105 + for( const auto& p : m_unsubscribeTokenToTopic )
1104 { 1106 {
1105 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); 1107 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
1106 - m_pendingOperations.erase(p.first); 1108 + m_pendingOperations.erase( p.first );
1107 } 1109 }
1108 // Clear the administration used in the subscribe process. 1110 // Clear the administration used in the subscribe process.
1109 m_subscribeTokenToTopic.clear(); 1111 m_subscribeTokenToTopic.clear();
@@ -1111,55 +1113,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string&amp; cause) @@ -1111,55 +1113,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string&amp; cause)
1111 } 1113 }
1112 1114
1113 // static 1115 // static
1114 -void ClientPaho::onFirstConnect(void* context, char* cause) 1116 +void ClientPaho::onFirstConnect( void* context, char* cause )
1115 { 1117 {
1116 LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." ); 1118 LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
1117 - if(context) 1119 + if( context )
1118 { 1120 {
1119 - auto *cl = reinterpret_cast<ClientPaho*>(context);  
1120 - std::string reason(nullptr == cause ? "Unknown cause" : cause);  
1121 - cl->pushIncomingEvent([cl, reason]() { cl->onConnectSuccessOnInstance(); }); 1121 + auto *cl = reinterpret_cast<ClientPaho*>( context );
  1122 + std::string reason( nullptr == cause ? "Unknown cause" : cause );
  1123 + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
1122 } 1124 }
1123 } 1125 }
1124 1126
1125 -void ClientPaho::onConnect(void* context, char* cause) 1127 +void ClientPaho::onConnect( void* context, char* cause )
1126 { 1128 {
1127 LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." ); 1129 LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
1128 - if (context) 1130 + if( context )
1129 { 1131 {
1130 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1131 - std::string reason(nullptr == cause ? "unknown cause" : cause);  
1132 - cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); }); 1132 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1133 + std::string reason( nullptr == cause ? "unknown cause" : cause );
  1134 + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
1133 } 1135 }
1134 } 1136 }
1135 1137
1136 // static 1138 // static
1137 -void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) 1139 +void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
1138 { 1140 {
1139 LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." ); 1141 LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
1140 - if (context) 1142 + if( context )
1141 { 1143 {
1142 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1143 - if (!response) 1144 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1145 + if( !response )
1144 { 1146 {
1145 // connect should always have a valid response struct. 1147 // connect should always have a valid response struct.
1146 - LogError( "[ClientPaho]", "onConnectSuccess - no response data"); 1148 + LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
1147 return; 1149 return;
1148 } 1150 }
1149 // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); 1151 // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
1150 - cl->pushIncomingEvent([cl]() { cl->onConnectSuccessOnInstance(); }); 1152 + cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
1151 } 1153 }
1152 } 1154 }
1153 1155
1154 // static 1156 // static
1155 -void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) 1157 +void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
1156 { 1158 {
1157 - LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ));  
1158 - if (context) 1159 + LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
  1160 + if( context )
1159 { 1161 {
1160 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1161 - MqttFailure resp(response);  
1162 - cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); }); 1162 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1163 + MqttFailure resp( response );
  1164 + cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
1163 } 1165 }
1164 } 1166 }
1165 1167
@@ -1184,35 +1186,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response @@ -1184,35 +1186,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response
1184 //} 1186 //}
1185 1187
1186 // static 1188 // static
1187 -void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response) 1189 +void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
1188 { 1190 {
1189 - if (context) 1191 + if( context )
1190 { 1192 {
1191 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1192 - MqttSuccess resp(response ? response->token : 0);  
1193 - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); }); 1193 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1194 + MqttSuccess resp( response ? response->token : 0 );
  1195 + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
1194 } 1196 }
1195 } 1197 }
1196 1198
1197 // static 1199 // static
1198 -void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) 1200 +void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
1199 { 1201 {
1200 LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." ); 1202 LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
1201 - if (context) 1203 + if( context )
1202 { 1204 {
1203 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1204 - MqttFailure resp(response);  
1205 - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); }); 1205 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1206 + MqttFailure resp( response );
  1207 + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
1206 } 1208 }
1207 } 1209 }
1208 1210
1209 // static 1211 // static
1210 -void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response) 1212 +void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
1211 { 1213 {
1212 - if (context) 1214 + if( context )
1213 { 1215 {
1214 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1215 - if (!response) 1216 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1217 + if( !response )
1216 { 1218 {
1217 // publish should always have a valid response struct. 1219 // publish should always have a valid response struct.
1218 // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); 1220 // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
@@ -1223,134 +1225,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response @@ -1223,134 +1225,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response
1223 } 1225 }
1224 1226
1225 // static 1227 // static
1226 -void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response) 1228 +void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
1227 { 1229 {
1228 - (void)response;  
1229 - if (context) 1230 + (void) response;
  1231 + if( context )
1230 { 1232 {
1231 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1232 - MqttFailure resp(response);  
1233 - cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); }); 1233 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1234 + MqttFailure resp( response );
  1235 + cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
1234 } 1236 }
1235 } 1237 }
1236 1238
1237 // static 1239 // static
1238 -void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response) 1240 +void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
1239 { 1241 {
1240 - if (context) 1242 + if( context )
1241 { 1243 {
1242 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1243 - if (!response) 1244 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1245 + if( !response )
1244 { 1246 {
1245 // subscribe should always have a valid response struct. 1247 // subscribe should always have a valid response struct.
1246 // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); 1248 // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
1247 } 1249 }
1248 - MqttSuccess resp(response->token, response->alt.qos);  
1249 - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); }); 1250 + MqttSuccess resp( response->token, response->alt.qos );
  1251 + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
1250 } 1252 }
1251 } 1253 }
1252 1254
1253 // static 1255 // static
1254 -void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response) 1256 +void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
1255 { 1257 {
1256 - if (context) 1258 + if( context )
1257 { 1259 {
1258 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1259 - MqttFailure resp(response);  
1260 - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); }); 1260 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1261 + MqttFailure resp( response );
  1262 + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
1261 } 1263 }
1262 } 1264 }
1263 1265
1264 // static 1266 // static
1265 -void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response) 1267 +void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
1266 { 1268 {
1267 - if (context) 1269 + if( context )
1268 { 1270 {
1269 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1270 - MqttSuccess resp(response ? response->token : -1);  
1271 - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); }); 1271 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1272 + MqttSuccess resp( response ? response->token : -1 );
  1273 + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
1272 } 1274 }
1273 } 1275 }
1274 1276
1275 // static 1277 // static
1276 -void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response) 1278 +void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
1277 { 1279 {
1278 - if (context) 1280 + if( context )
1279 { 1281 {
1280 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1281 - MqttFailure resp(response);  
1282 - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); }); 1282 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1283 + MqttFailure resp( response );
  1284 + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
1283 } 1285 }
1284 } 1286 }
1285 1287
1286 // static 1288 // static
1287 -int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message) 1289 +int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
1288 { 1290 {
1289 1291
1290 - OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]() 1292 + OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
1291 { 1293 {
1292 - MQTTAsync_freeMessage(&message);  
1293 - MQTTAsync_free(topicName); 1294 + MQTTAsync_freeMessage( &message );
  1295 + MQTTAsync_free( topicName );
1294 }); 1296 });
1295 1297
1296 - if (context) 1298 + if( context )
1297 { 1299 {
1298 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1299 - MqttMessage msg(topicName, *message);  
1300 - cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); }); 1300 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1301 + MqttMessage msg( topicName, *message );
  1302 + cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
1301 } 1303 }
1302 1304
1303 return 1; // always return true. Otherwise this callback is triggered again. 1305 return 1; // always return true. Otherwise this callback is triggered again.
1304 } 1306 }
1305 1307
1306 // static 1308 // static
1307 -void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token) 1309 +void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
1308 { 1310 {
1309 - if (context) 1311 + if( context )
1310 { 1312 {
1311 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1312 - cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); }); 1313 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1314 + cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
1313 } 1315 }
1314 } 1316 }
1315 1317
1316 // static 1318 // static
1317 -void ClientPaho::onConnectionLost(void* context, char* cause) 1319 +void ClientPaho::onConnectionLost( void* context, char* cause )
1318 { 1320 {
1319 - OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]() 1321 + OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
1320 { 1322 {
1321 - if (cause) 1323 + if( cause )
1322 { 1324 {
1323 - MQTTAsync_free(cause); 1325 + MQTTAsync_free( cause );
1324 } 1326 }
1325 }); 1327 });
1326 1328
1327 - if (context) 1329 + if( context )
1328 { 1330 {
1329 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1330 - std::string msg(nullptr == cause ? "cause unknown" : cause);  
1331 - cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); }); 1331 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1332 + std::string msg( nullptr == cause ? "cause unknown" : cause );
  1333 + cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
1332 } 1334 }
1333 } 1335 }
1334 1336
1335 // static 1337 // static
1336 -void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message) 1338 +void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
1337 { 1339 {
1338 - (void)message;  
1339 - switch (level) 1340 + (void) message;
  1341 + switch( level )
1340 { 1342 {
1341 case MQTTASYNC_TRACE_MAXIMUM: 1343 case MQTTASYNC_TRACE_MAXIMUM:
1342 case MQTTASYNC_TRACE_MEDIUM: 1344 case MQTTASYNC_TRACE_MEDIUM:
1343 - case MQTTASYNC_TRACE_MINIMUM: { 1345 + case MQTTASYNC_TRACE_MINIMUM:
  1346 + {
1344 // ("ClientPaho", "paho - %1", message) 1347 // ("ClientPaho", "paho - %1", message)
1345 break; 1348 break;
1346 } 1349 }
1347 - case MQTTASYNC_TRACE_PROTOCOL: { 1350 + case MQTTASYNC_TRACE_PROTOCOL:
  1351 + {
1348 // ("ClientPaho", "paho - %1", message) 1352 // ("ClientPaho", "paho - %1", message)
1349 break; 1353 break;
1350 } 1354 }
1351 case MQTTASYNC_TRACE_ERROR: 1355 case MQTTASYNC_TRACE_ERROR:
1352 case MQTTASYNC_TRACE_SEVERE: 1356 case MQTTASYNC_TRACE_SEVERE:
1353 - case MQTTASYNC_TRACE_FATAL: { 1357 + case MQTTASYNC_TRACE_FATAL:
  1358 + {
1354 // ("ClientPaho", "paho - %1", message) 1359 // ("ClientPaho", "paho - %1", message)
1355 break; 1360 break;
1356 } 1361 }