Commit 0c424e03d8bbb5fb4c7900392ce4e4e07e5333de

Authored by Steven
1 parent 2c0c99a5

syntax fixes. WIP

Showing 1 changed file with 94 additions and 86 deletions
src/clientpaho.cpp
... ... @@ -280,18 +280,19 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
280 280 return -100;
281 281 }
282 282  
283   -std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
  283 +std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
284 284 {
285 285 ConnectionStatus currentStatus = m_connectionStatus;
286 286  
287 287 {
288 288 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
289   - if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) {
  289 + if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
  290 + {
290 291 return -1;
291 292 }
292 293  
293 294 currentStatus = m_connectionStatus;
294   - setConnectionStatus(ConnectionStatus::DisconnectInProgress);
  295 + setConnectionStatus( ConnectionStatus::DisconnectInProgress );
295 296 }
296 297  
297 298 MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
... ... @@ -303,7 +304,8 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
303 304 std::promise<void> waitForDisconnectPromise{};
304 305 auto waitForDisconnect = waitForDisconnectPromise.get_future();
305 306 m_disconnectPromise.reset();
306   - if (wait) {
  307 + if( wait )
  308 + {
307 309 m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
308 310 }
309 311  
... ... @@ -311,29 +313,29 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
311 313 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
312 314 if (!m_pendingOperations.insert(-200).second)
313 315 {
314   - // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200)
  316 + //"ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200)
315 317 }
316 318 m_operationResult.erase(-200);
317 319 }
318 320  
319 321 int rc = MQTTAsync_disconnect(m_client, &disconn_opts);
320   - if (MQTTASYNC_SUCCESS != rc)
  322 + if( MQTTASYNC_SUCCESS != rc )
321 323 {
322   - if (MQTTASYNC_DISCONNECTED == rc)
  324 + if( MQTTASYNC_DISCONNECTED == rc )
323 325 {
324 326 currentStatus = ConnectionStatus::Disconnected;
325 327 }
326 328  
327   - setConnectionStatus(currentStatus);
328   - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  329 + setConnectionStatus( currentStatus );
  330 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
329 331 m_operationResult[-200] = false;
330 332 m_pendingOperations.erase(-200);
331 333  
332   - if (MQTTASYNC_DISCONNECTED == rc)
  334 + if( MQTTASYNC_DISCONNECTED == rc )
333 335 {
334 336 return -1;
335 337 }
336   - // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
  338 + // ("ClientPaho", std::string( "%1 - failed to disconnect, return code %2" ).arg( m_clientId ).arg( pahoAsyncErrorCodeToString(rc)) );
337 339 }
338 340  
339 341 if( wait )
... ... @@ -351,64 +353,62 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
351 353  
352 354 std::int32_t ClientPaho::publish(const MqttMessage& message, int qos)
353 355 {
354   - if (ConnectionStatus::DisconnectInProgress == m_connectionStatus)
  356 + if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
355 357 {
356 358 // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic());
357 359 return -1;
358 360 }
359   - else if (ConnectionStatus::Disconnected == m_connectionStatus)
  361 + else if( ConnectionStatus::Disconnected == m_connectionStatus )
360 362 {
361 363 // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId);
362 364 connect( true );
363 365 }
364 366  
365   - if (!isValidTopic(message.topic()))
  367 + if( !isValidTopic(message.topic() ) )
366 368 {
367 369 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic());
368 370 }
369 371  
370   - if (qos > 2)
  372 + if( qos > 2 )
371 373 {
372 374 qos = 2;
373 375 }
374   - else if (qos < 0)
  376 + else if( qos < 0 )
375 377 {
376 378 qos = 0;
377 379 }
378 380  
379   -
380 381 std::unique_lock<std::mutex> lck(m_mutex);
381 382 if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes)
382   - // if (ConnectionStatus::Connected != m_connectionStatus || m_processPendingPublishes)
383 383 {
384 384 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
385 385 if(ConnectionStatus::ReconnectInProgress == m_connectionStatus)
386 386 {
387   - LogDebug( "ClientPaho", "Adding publish to pending queue.");
  387 + LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
388 388 m_pendingPublishes.push_front(Publish{ qos, message });
389 389 return -1;
390 390 }
391 391 }
392 392  
393   - return publishInternal(message, qos);
  393 + return publishInternal( message, qos );
394 394 }
395 395  
396 396 void ClientPaho::publishPending()
397 397 {
398 398 {
399 399 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
400   - if (!m_processPendingPublishes)
  400 + if( !m_processPendingPublishes )
401 401 {
402 402 return;
403 403 }
404 404 }
405 405  
406   - if (ConnectionStatus::Connected != m_connectionStatus)
  406 + if( ConnectionStatus::Connected != m_connectionStatus )
407 407 {
408 408 LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
409 409 }
410 410  
411   - while (!m_pendingPublishes.empty())
  411 + while( !m_pendingPublishes.empty() )
412 412 {
413 413 const auto& pub = m_pendingPublishes.back();
414 414 publishInternal(pub.data, pub.qos);
... ... @@ -424,23 +424,23 @@ void ClientPaho::publishPending()
424 424 m_pendingPublishesReadyCV.notify_all();
425 425 }
426 426  
427   -std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb)
  427 +std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
428 428 {
429   - if (ConnectionStatus::Connected != m_connectionStatus)
  429 + if( ConnectionStatus::Connected != m_connectionStatus )
430 430 {
431 431 // MqttException, "Not connected"
432 432 }
433 433  
434   - if (!isValidTopic(topic))
  434 + if( !isValidTopic( topic ) )
435 435 {
436 436 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
437 437 }
438 438  
439   - if (qos > 2)
  439 + if( qos > 2 )
440 440 {
441 441 qos = 2;
442 442 }
443   - else if (qos < 0)
  443 + else if( qos < 0 )
444 444 {
445 445 qos = 0;
446 446 }
... ... @@ -449,21 +449,27 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std:
449 449 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
450 450  
451 451 auto itExisting = m_subscriptions.find(topic);
452   - if (m_subscriptions.end() != itExisting) {
453   - if (itExisting->second.qos == qos) {
  452 + if( m_subscriptions.end() != itExisting )
  453 + {
  454 + if( itExisting->second.qos == qos )
  455 + {
454 456 return -1;
455 457 }
456 458 // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
457 459 }
458 460  
459 461 auto itPending = m_pendingSubscriptions.find(topic);
460   - if (m_pendingSubscriptions.end() != itPending) {
461   - if (itPending->second.qos == qos) {
462   - auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; });
463   - if (m_subscribeTokenToTopic.end() != itToken) {
  462 + if( m_pendingSubscriptions.end() != itPending )
  463 + {
  464 + if( itPending->second.qos == qos )
  465 + {
  466 + auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
  467 + if( m_subscribeTokenToTopic.end() != itToken )
  468 + {
464 469 return itToken->first;
465 470 }
466   - else {
  471 + else
  472 + {
467 473 return -1;
468 474 }
469 475 }
... ... @@ -471,15 +477,15 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std:
471 477 }
472 478  
473 479 std::string existingTopic{};
474   - if (isOverlappingInternal(topic, existingTopic))
  480 + if( isOverlappingInternal( topic, existingTopic ) )
475 481 {
476 482 // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
477 483 }
478 484  
479 485 // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic);
480   - m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb }));
  486 + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb } ) );
481 487 }
482   - return subscribeInternal(topic, qos);
  488 + return subscribeInternal( topic, qos );
483 489 }
484 490  
485 491 void ClientPaho::resubscribe()
... ... @@ -490,26 +496,26 @@ void ClientPaho::resubscribe()
490 496 std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end()));
491 497 }
492 498  
493   - for (const auto& s : pendingSubscriptions)
  499 + for( const auto& s : pendingSubscriptions )
494 500 {
495   - subscribeInternal(s.first, s.second.qos);
  501 + subscribeInternal( s.first, s.second.qos );
496 502 }
497 503 }
498 504  
499 505 std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
500 506 {
501 507 {
502   - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  508 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
503 509 bool found = false;
504   - for (const auto& s : m_subscriptions)
  510 + for( const auto& s : m_subscriptions )
505 511 {
506   - if (topic == s.first && qos == s.second.qos)
  512 + if( topic == s.first && qos == s.second.qos )
507 513 {
508 514 found = true;
509 515 break;
510 516 }
511 517 }
512   - if (!found)
  518 + if( !found )
513 519 {
514 520 return -1;
515 521 }
... ... @@ -525,18 +531,18 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
525 531 // the insertion of the token into the pending operations.
526 532 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
527 533 auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts);
528   - if (MQTTASYNC_SUCCESS != rc)
  534 + if( MQTTASYNC_SUCCESS != rc )
529 535 {
530 536 // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc));
531 537 }
532 538  
533   - if (!m_pendingOperations.insert(opts.token).second)
  539 + if( !m_pendingOperations.insert( opts.token ).second )
534 540 {
535 541 // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token);
536 542 }
537 543  
538   - m_operationResult.erase(opts.token);
539   - if (m_unsubscribeTokenToTopic.count(opts.token) > 0)
  544 + m_operationResult.erase( opts.token );
  545 + if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
540 546 {
541 547 // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic);
542 548 }
... ... @@ -552,13 +558,14 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
552 558  
553 559 void ClientPaho::unsubscribeAll()
554 560 {
555   - decltype(m_subscriptions) subscriptions{};
  561 + decltype( m_subscriptions ) subscriptions{};
556 562 {
557 563 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
558 564 subscriptions = m_subscriptions;
559 565 }
560 566  
561   - for (const auto& s : subscriptions) {
  567 + for( const auto& s : subscriptions )
  568 + {
562 569 this->unsubscribe(s.first, s.second.qos);
563 570 }
564 571 }
... ... @@ -708,31 +715,31 @@ std::int32_t ClientPaho::subscribeInternal(const std::string&amp; topic, int qos)
708 715 return opts.token;
709 716 }
710 717  
711   -void ClientPaho::setConnectionStatus(ConnectionStatus status)
  718 +void ClientPaho::setConnectionStatus( ConnectionStatus status )
712 719 {
713 720 ConnectionStatus curStatus = m_connectionStatus;
714 721 m_connectionStatus = status;
715   - if (status != curStatus && m_connectionStatusCallback)
  722 + if( status != curStatus && m_connectionStatusCallback )
716 723 {
717   - m_connectionStatusCallback(m_clientId, status);
  724 + m_connectionStatusCallback( m_clientId, status );
718 725 }
719 726 }
720 727  
721   -bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const
  728 +bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
722 729 {
723 730 existingTopic.clear();
724   - for (const auto& s : m_pendingSubscriptions)
  731 + for( const auto& s : m_pendingSubscriptions )
725 732 {
726   - if (testForOverlap(s.first, topic))
  733 + if( testForOverlap( s.first, topic ) )
727 734 {
728 735 existingTopic = s.first;
729 736 return true;
730 737 }
731 738 }
732 739  
733   - for (const auto& s : m_subscriptions)
  740 + for( const auto& s : m_subscriptions )
734 741 {
735   - if (testForOverlap(s.first, topic))
  742 + if( testForOverlap(s.first, topic ) )
736 743 {
737 744 existingTopic = s.first;
738 745 return true;
... ... @@ -748,26 +755,25 @@ void ClientPaho::pushIncomingEvent(std::function&lt;void()&gt; ev)
748 755  
749 756 void ClientPaho::callbackEventHandler()
750 757 {
751   - LogDebug("ClientPaho", std::string( m_clientId + " - starting callback event handler") );
752   - for (;;) {
  758 + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler") );
  759 + for( ;; )
  760 + {
753 761 std::vector<std::function<void()>> events;
754   - if (!m_callbackEventQueue.pop(events))
  762 + if( !m_callbackEventQueue.pop(events) )
755 763 {
756 764 break;
757 765 }
758 766  
759   - for (const auto& ev : events)
  767 + for( const auto& ev : events )
760 768 {
761 769 ev();
762   - // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException);
763 770 }
764 771 }
765 772 // ("ClientPaho", "%1 - leaving callback event handler", m_clientId);
766 773 }
767   -void ClientPaho::onConnectOnInstance(const std::string& cause)
  774 +void ClientPaho::onConnectOnInstance( const std::string& cause )
768 775 {
769 776 (void)cause;
770   - // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause);
771 777 {
772 778 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
773 779 std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end()));
... ... @@ -783,10 +789,10 @@ void ClientPaho::onConnectSuccessOnInstance()
783 789 {
784 790 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
785 791 // Register the connect callback that is used in reconnect scenarios.
786   - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect);
787   - if (MQTTASYNC_SUCCESS != rc)
  792 + auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
  793 + if( MQTTASYNC_SUCCESS != rc )
788 794 {
789   - LogError( "[ClientPaho]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
  795 + LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
790 796 }
791 797  
792 798 // For MQTTV5
... ... @@ -800,19 +806,19 @@ void ClientPaho::onConnectSuccessOnInstance()
800 806 m_pendingOperations.erase(-100);
801 807 }
802 808  
803   - setConnectionStatus(ConnectionStatus::Connected);
804   - if(m_connectPromise)
  809 + setConnectionStatus( ConnectionStatus::Connected );
  810 + if( m_connectPromise )
805 811 {
806   - LogDebug( "[ClientPaho]", std::string("connectPromise still present. Resetting!") );
  812 + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!") );
807 813 m_connectPromise->set_value();
808 814 }
809 815 m_operationsCompleteCV.notify_all();
810 816 }
811 817  
812   -void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response)
  818 +void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
813 819 {
814   - (void)response;
815   - LogDebug("ClientPaho", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
  820 + (void) response;
  821 + LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
816 822 {
817 823 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
818 824 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
... ... @@ -846,17 +852,18 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&amp;)
846 852 m_pendingOperations.clear();
847 853 }
848 854  
849   - setConnectionStatus(ConnectionStatus::Disconnected);
  855 + setConnectionStatus( ConnectionStatus::Disconnected );
850 856  
851   - if (m_disconnectPromise) {
  857 + if( m_disconnectPromise )
  858 + {
852 859 m_disconnectPromise->set_value();
853 860 }
854 861 m_operationsCompleteCV.notify_all();
855 862 }
856 863  
857   -void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response)
  864 +void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
858 865 {
859   - (void)response;
  866 + (void) response;
860 867 // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());
861 868 {
862 869 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
... ... @@ -865,23 +872,23 @@ void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure&amp; response)
865 872 m_pendingOperations.erase(-200);
866 873 }
867 874  
868   - if (MQTTAsync_isConnected(m_client))
  875 + if( MQTTAsync_isConnected( m_client ) )
869 876 {
870   - setConnectionStatus(ConnectionStatus::Connected);
  877 + setConnectionStatus( ConnectionStatus::Connected );
871 878 }
872 879 else
873 880 {
874   - setConnectionStatus(ConnectionStatus::Disconnected);
  881 + setConnectionStatus( ConnectionStatus::Disconnected );
875 882 }
876 883  
877   - if (m_disconnectPromise)
  884 + if( m_disconnectPromise )
878 885 {
879 886 m_disconnectPromise->set_value();
880 887 }
881 888 m_operationsCompleteCV.notify_all();
882 889 }
883 890  
884   -void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response)
  891 +void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
885 892 {
886 893 auto pd = response.publishData();
887 894 // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload());
... ... @@ -894,7 +901,7 @@ void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess&amp; response)
894 901 m_operationsCompleteCV.notify_all();
895 902 }
896 903  
897   -void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response)
  904 +void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
898 905 {
899 906 // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message());
900 907 {
... ... @@ -906,7 +913,7 @@ void ClientPaho::onPublishFailureOnInstance(const MqttFailure&amp; response)
906 913 m_operationsCompleteCV.notify_all();
907 914 }
908 915  
909   -void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response)
  916 +void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
910 917 {
911 918 // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token());
912 919 OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
... ... @@ -919,7 +926,8 @@ void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess&amp; response)
919 926 m_pendingOperations.erase(response.token());
920 927 });
921 928 auto it = m_subscribeTokenToTopic.find(response.token());
922   - if (m_subscribeTokenToTopic.end() == it) {
  929 + if (m_subscribeTokenToTopic.end() == it)
  930 + {
923 931 // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token());
924 932 return;
925 933 }
... ...