Closed
Merge Request #15 · created by Steven de Ridder


Development


From development into feat/sridder/connection_attempt_settings

Closed by Steven de Ridder

Changes were not merged into target branch

1 participants







CMakeLists.txt
@@ -8,6 +8,7 @@ include(projectheader) @@ -8,6 +8,7 @@ include(projectheader)
8 project_header(osdev_mqtt) 8 project_header(osdev_mqtt)
9 9
10 add_subdirectory(src) 10 add_subdirectory(src)
  11 +add_subdirectory(examples/connect)
11 add_subdirectory(examples/pub) 12 add_subdirectory(examples/pub)
12 add_subdirectory(examples/sub) 13 add_subdirectory(examples/sub)
13 14
debug_log.txt 0 → 100644
  1 +With broker present
  2 +
  3 +Jul 02 00:50:19 intelnuc64.osdev.nl test_connection[30797]: MQTT Client started[30797]: [MqttClient::MqttClient]
  4 +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::eventHandler][30797]: ConnectionTest - starting event handler.
  5 +Jul 02 00:50:19 intelnuc64.osdev.nl MqttClient[30797]: ConnectionTest - Request connect
  6 +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - Setting the extra onConnected callback.
  7 +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 2
  8 +Jul 02 00:50:19 intelnuc64.osdev.nl ClientPaho[30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - starting callback event handler
  9 +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho::onConnectSuccess][30797]: onConnectSucces triggered..
  10 +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: onConnectSuccessOnInstance ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - connected to endpoint localhost:1883 (mqtt version 4, session present FALSE )
  11 +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 4
  12 +Jul 02 00:50:30 intelnuc64.osdev.nl mosquitto[29463]: 1656715830: Client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a closed its connection.
  13 +
  14 +
  15 +Without a broker present
  16 +
  17 +Jul 02 00:55:33 intelnuc64.osdev.nl test_connection[31574]: MQTT Client started[31574]: [MqttClient::MqttClient]
  18 +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::eventHandler][31574]: ConnectionTest - starting event handler.
  19 +Jul 02 00:55:33 intelnuc64.osdev.nl MqttClient[31574]: ConnectionTest - Request connect
  20 +Jul 02 00:55:33 intelnuc64.osdev.nl [ClientPaho][31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - Setting the extra onConnected callback.
  21 +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 2
  22 +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - starting callback event handler
  23 +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: onConnectFailureOnInstanceConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - connection failed with code MQTTASYNC_FAILURE (TCP/TLS connect failure)
  24 +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 0
  25 +
  26 +Jul 02 00:55:42 intelnuc64.osdev.nl systemd[1]: Starting Mosquitto MQTT Broker...
  27 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 starting
  28 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Config loaded from /etc/mosquitto/mosquitto.conf.
  29 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Starting in local only mode. Connections will only be possible from clients running on this machine.
  30 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Create a configuration file which defines a listener to allow remote access.
  31 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: For more details see https://mosquitto.org/documentation/authentication-methods/
  32 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv4 listen socket on port 1883.
  33 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv6 listen socket on port 1883.
  34 +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 running
  35 +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New connection from 127.0.0.1:42196 on port 1883.
  36 +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New client connected from 127.0.0.1:42196 as ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 (p2, c1, k5).
  37 +Jul 02 00:55:53 intelnuc64.osdev.nl mosquitto[31610]: 1656716153: Client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 closed its connection.
  38 +
  39 +
examples/connect/CMakeLists.txt 0 → 100644
  1 +cmake_minimum_required(VERSION 3.0)
  2 +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake)
  3 +
  4 +include(projectheader)
  5 +project_header(test_connections)
  6 +
  7 +include_directories( SYSTEM
  8 + ${CMAKE_CURRENT_SOURCE_DIR}/../../include
  9 +)
  10 +
  11 +include(compiler)
  12 +set(SRC_LIST
  13 + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp
  14 +)
  15 +
  16 +add_executable( ${PROJECT_NAME}
  17 + ${SRC_LIST}
  18 +)
  19 +
  20 +target_link_libraries(
  21 + ${PROJECT_NAME}
  22 + mqtt-cpp
  23 +)
  24 +
  25 +set_target_properties( ${PROJECT_NAME} PROPERTIES
  26 + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin
  27 + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  28 + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib
  29 +)
  30 +
  31 +include(installation)
  32 +install_application()
examples/connect/main.cpp 0 → 100644
  1 +
  2 +#include <iostream>
  3 +#include <unistd.h>
  4 +
  5 +#include "mqttclient.h"
  6 +
  7 +using namespace osdev::components::mqtt;
  8 +
  9 +int main( int argc, char* argv[] )
  10 +{
  11 + (void)argc;
  12 + (void)argv;
  13 +
  14 + MqttClient oClient( "ConnectionTest" );
  15 + oClient.connect( "localhost", 1883, Credentials() );
  16 +
  17 + unsigned int nCount = 0;
  18 +
  19 + while(1)
  20 + {
  21 + std::cout << "[" << std::to_string(nCount++) << "] MQTT Client status : " << oClient.state() << std::endl;
  22 + sleep( 1 );
  23 + }
  24 +
  25 + return 0;
  26 +}
examples/pub/main.cpp
@@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) @@ -94,7 +94,7 @@ int main( int argc, char* argv[] )
94 } 94 }
95 messageNumber++; 95 messageNumber++;
96 } 96 }
97 - sleepcp( 5, T_SECONDS ); 97 + sleepcp( 1, T_SECONDS );
98 } 98 }
99 } 99 }
100 else 100 else
examples/sub/main.cpp
@@ -76,14 +76,16 @@ int main( int argc, char* argv[] ) @@ -76,14 +76,16 @@ int main( int argc, char* argv[] )
76 { 76 {
77 std::cout << "[OK]" << std::endl; 77 std::cout << "[OK]" << std::endl;
78 std::cout << "Connecting to the test-broker : " << std::endl; 78 std::cout << "Connecting to the test-broker : " << std::endl;
79 - pSubscriber->connect( "localhost", 1883, "", "" ); 79 + pSubscriber->connect( "localhost", 1883, "", "", "test/subscriber/LWT", "Subscriber disconnected." );
  80 +
80 std::cout << "Subscribing to the test-topic....." << std::endl; 81 std::cout << "Subscribing to the test-topic....." << std::endl;
81 pSubscriber->subscribe( "test/publisher/#" ); 82 pSubscriber->subscribe( "test/publisher/#" );
82 83
83 // Start a loop to give the subscriber the possibility to do its work. 84 // Start a loop to give the subscriber the possibility to do its work.
84 while( 1 ) 85 while( 1 )
85 { 86 {
86 - sleepcp( 1, T_MICRO ); // Sleep 1 Sec to give the scheduler the change to interfene. 87 + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene.
  88 + std::cout << ".";
87 } 89 }
88 } 90 }
89 else 91 else
examples/sub/subscriber.cpp
@@ -24,11 +24,25 @@ @@ -24,11 +24,25 @@
24 #include <iostream> 24 #include <iostream>
25 25
26 Subscriber::Subscriber( const std::string &client_id ) 26 Subscriber::Subscriber( const std::string &client_id )
27 - : MqttSubscriberBase( client_id ) 27 + : m_mqtt_client( client_id )
28 { 28 {
29 29
30 } 30 }
31 31
  32 +void Subscriber::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message )
  33 +{
  34 + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) );
  35 + std::cout << "Client state : " << m_mqtt_client.state() << std::endl;
  36 +}
  37 +
  38 +void Subscriber::subscribe( const std::string &message_topic )
  39 +{
  40 + m_mqtt_client.subscribe( message_topic, 1, [this](const osdev::components::mqtt::MqttMessage &message )
  41 + {
  42 + this->receive_data( message.topic(), message.payload() );
  43 + });
  44 +}
  45 +
32 void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload ) 46 void Subscriber::receive_data( const std::string &message_topic, const std::string &message_payload )
33 { 47 {
34 std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl; 48 std::cout << "[Subscriber::receive_data] - Received message : " << message_payload << " from topic : " << message_topic << std::endl;
examples/sub/subscriber.h
@@ -22,19 +22,29 @@ @@ -22,19 +22,29 @@
22 #pragma once 22 #pragma once
23 23
24 // std 24 // std
  25 +#include <memory>
25 #include <string> 26 #include <string>
26 27
27 // mqtt-cpp 28 // mqtt-cpp
28 -#include "mqttsubscriberbase.h" 29 +#include "mqttclient.h"
  30 +#include "compat-c++14.h"
29 31
30 -class Subscriber : public MqttSubscriberBase 32 +class Subscriber
31 { 33 {
32 public: 34 public:
33 - Subscriber( const std::string &client_id ); 35 + Subscriber(const std::string &client_id);
34 36
35 virtual ~Subscriber() {} 37 virtual ~Subscriber() {}
36 38
  39 + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string(),
  40 + const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() );
  41 +
  42 + void subscribe( const std::string &message_topic );
  43 +
37 protected: 44 protected:
38 - void receive_data( const std::string &message_topic, const std::string &message_payload ); 45 + void receive_data( const std::string &message_topic, const std::string &message_payload );
  46 +
  47 +private:
  48 + osdev::components::mqtt::MqttClient m_mqtt_client;
39 49
40 }; 50 };
include/clientpaho.h
@@ -215,7 +215,7 @@ private: @@ -215,7 +215,7 @@ private:
215 * The connection status is set to Connected. 215 * The connection status is set to Connected.
216 * @param response A success response with connection data. 216 * @param response A success response with connection data.
217 */ 217 */
218 - void onConnectSuccessOnInstance(const MqttSuccess& response); 218 + void onConnectSuccessOnInstance();
219 219
220 /** 220 /**
221 * @brief Callback that is called when a connect call fails after being sent to the endpoint. 221 * @brief Callback that is called when a connect call fails after being sent to the endpoint.
@@ -298,6 +298,7 @@ private: @@ -298,6 +298,7 @@ private:
298 void onConnectionLostOnInstance(const std::string& cause); 298 void onConnectionLostOnInstance(const std::string& cause);
299 299
300 // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts. 300 // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts.
  301 + static void onFirstConnect(void* context, char* cause);
301 static void onConnect(void* context, char* cause); 302 static void onConnect(void* context, char* cause);
302 static void onConnectSuccess(void* context, MQTTAsync_successData* response); 303 static void onConnectSuccess(void* context, MQTTAsync_successData* response);
303 static void onConnectFailure(void* context, MQTTAsync_failureData* response); 304 static void onConnectFailure(void* context, MQTTAsync_failureData* response);
include/imqttclient.h
@@ -60,13 +60,13 @@ public: @@ -60,13 +60,13 @@ public:
60 * @param port The port to use. 60 * @param port The port to use.
61 * @param credentials The credentials to use. 61 * @param credentials The credentials to use.
62 */ 62 */
63 - virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; 63 + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0;
64 64
65 /** 65 /**
66 * @brief Connect to the endpoint 66 * @brief Connect to the endpoint
67 * @param endpoint an uri endpoint description. 67 * @param endpoint an uri endpoint description.
68 */ 68 */
69 - virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; 69 + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0;
70 70
71 /** 71 /**
72 * @brief Disconnect the client from the broker 72 * @brief Disconnect the client from the broker
include/mqttclient.h
@@ -42,6 +42,26 @@ namespace osdev { @@ -42,6 +42,26 @@ namespace osdev {
42 namespace components { 42 namespace components {
43 namespace mqtt { 43 namespace mqtt {
44 44
  45 +// Internal structure for storing subscriptions until a valid connection becomes available.
  46 +class Subscription
  47 +{
  48 +public:
  49 + Subscription( const std::string &topic, int qos, const std::function<void(MqttMessage)> call_back )
  50 + : m_topic( topic )
  51 + , m_qos( qos )
  52 + , m_call_back(call_back )
  53 + {}
  54 +
  55 + std::string getTopic() const { return m_topic; }
  56 + int getQoS() const { return m_qos; }
  57 + std::function<void(MqttMessage)> getCallBack() { return m_call_back; }
  58 +
  59 +private:
  60 + std::string m_topic;
  61 + int m_qos;
  62 + std::function<void(MqttMessage)> m_call_back;
  63 +};
  64 +
45 // Forward definition 65 // Forward definition
46 class IMqttClientImpl; 66 class IMqttClientImpl;
47 67
@@ -92,12 +112,12 @@ public: @@ -92,12 +112,12 @@ public:
92 /** 112 /**
93 * @see IMqttClient 113 * @see IMqttClient
94 */ 114 */
95 - virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override; 115 + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;
96 116
97 /** 117 /**
98 * @see IMqttClient 118 * @see IMqttClient
99 */ 119 */
100 - virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override; 120 + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override;
101 121
102 /** 122 /**
103 * @see IMqttClient 123 * @see IMqttClient
@@ -215,6 +235,7 @@ private: @@ -215,6 +235,7 @@ private:
215 235
216 mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive 236 mutable std::mutex m_interfaceMutex; ///< Makes the interface mutual exclusive
217 mutable std::mutex m_internalMutex; ///< Protect the internal state. 237 mutable std::mutex m_internalMutex; ///< Protect the internal state.
  238 + mutable std::mutex m_subscriptionMutex; ///< Protect the deferred Subscription Buffer
218 std::string m_endpoint; ///< The endpoint uri. 239 std::string m_endpoint; ///< The endpoint uri.
219 std::string m_clientId; ///< The main client identification. 240 std::string m_clientId; ///< The main client identification.
220 std::set<Token> m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens. 241 std::set<Token> m_activeTokens; ///< Set with active command tokens. Callbacks still need to be made for these tokens.
@@ -225,6 +246,7 @@ private: @@ -225,6 +246,7 @@ private:
225 std::vector<std::unique_ptr<IMqttClientImpl>> m_additionalClients; ///< A vector of additional wrapper clients. 246 std::vector<std::unique_ptr<IMqttClientImpl>> m_additionalClients; ///< A vector of additional wrapper clients.
226 SynchronizedQueue<std::function<void()>> m_eventQueue; ///< Synchronized queue for scheduling additional work. 247 SynchronizedQueue<std::function<void()>> m_eventQueue; ///< Synchronized queue for scheduling additional work.
227 std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads. 248 std::thread m_workerThread; ///< A worker thread that is used to perform actions that cannot be done on the callback threads.
  249 + std::vector<Subscription> m_deferredSubscriptions; ///< A buffer to store subscription requests until the principal client comes online
228 }; 250 };
229 251
230 } // End namespace mqtt 252 } // End namespace mqtt
src/clientpaho.cpp
@@ -24,6 +24,7 @@ @@ -24,6 +24,7 @@
24 #include "errorcode.h" 24 #include "errorcode.h"
25 #include "mqttutil.h" 25 #include "mqttutil.h"
26 #include "lockguard.h" 26 #include "lockguard.h"
  27 +#include "log.h"
27 #include "metaprogrammingdefs.h" 28 #include "metaprogrammingdefs.h"
28 #include "mqttstream.h" 29 #include "mqttstream.h"
29 #include "scopeguard.h" 30 #include "scopeguard.h"
@@ -93,10 +94,10 @@ struct Init @@ -93,10 +94,10 @@ struct Init
93 94
94 std::atomic_int ClientPaho::s_numberOfInstances(0); 95 std::atomic_int ClientPaho::s_numberOfInstances(0);
95 96
96 -ClientPaho::ClientPaho(const std::string& _endpoint, 97 +ClientPaho::ClientPaho( const std::string& _endpoint,
97 const std::string& _id, 98 const std::string& _id,
98 - const std::function<void(const std::string&, ConnectionStatus)>& connectionStatusCallback,  
99 - const std::function<void(const std::string& clientId, std::int32_t pubMsgToken)>& deliveryCompleteCallback) 99 + const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
  100 + const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
100 : m_mutex() 101 : m_mutex()
101 , m_endpoint() 102 , m_endpoint()
102 , m_username() 103 , m_username()
@@ -122,51 +123,55 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint, @@ -122,51 +123,55 @@ ClientPaho::ClientPaho(const std::string&amp; _endpoint,
122 , m_callbackEventQueue(m_clientId) 123 , m_callbackEventQueue(m_clientId)
123 , m_workerThread() 124 , m_workerThread()
124 { 125 {
125 - if (0 == s_numberOfInstances++) {  
126 - MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho); 126 + if( 0 == s_numberOfInstances++ )
  127 + {
  128 + MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
127 } 129 }
128 - // MLOGIC_COMMON_DEBUG("ClientPaho", "%1 - ctor ClientPaho %2", m_clientId, this); 130 +
  131 + LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
129 parseEndpoint(_endpoint); 132 parseEndpoint(_endpoint);
130 - auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);  
131 - if (MQTTASYNC_SUCCESS == rc) 133 +
  134 + auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
  135 + if( MQTTASYNC_SUCCESS == rc )
132 { 136 {
133 - MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete);  
134 - m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); 137 + MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
  138 + m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
135 } 139 }
136 else 140 else
137 { 141 {
138 - // Do something sensible here. 142 + LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
139 } 143 }
140 } 144 }
141 145
142 ClientPaho::~ClientPaho() 146 ClientPaho::~ClientPaho()
143 { 147 {
  148 + LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
144 if( MQTTAsync_isConnected( m_client ) ) 149 if( MQTTAsync_isConnected( m_client ) )
145 { 150 {
146 this->unsubscribeAll(); 151 this->unsubscribeAll();
147 152
148 - this->waitForCompletion(std::chrono::milliseconds(2000), std::set<int32_t>{});  
149 - this->disconnect(true, 5000); 153 + this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
  154 + this->disconnect( true, 5000 );
150 } 155 }
151 else 156 else
152 { 157 {
153 // If the status was already disconnected this call does nothing 158 // If the status was already disconnected this call does nothing
154 - setConnectionStatus(ConnectionStatus::Disconnected); 159 + setConnectionStatus( ConnectionStatus::Disconnected );
155 } 160 }
156 161
157 - if (0 == --s_numberOfInstances) 162 + if( 0 == --s_numberOfInstances )
158 { 163 {
159 // encountered a case where termination of the logging system within paho led to a segfault. 164 // encountered a case where termination of the logging system within paho led to a segfault.
160 // This was a paho thread that was cleaned while at the same time the logging system was terminated. 165 // This was a paho thread that was cleaned while at the same time the logging system was terminated.
161 // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less 166 // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
162 // frequently. 167 // frequently.
163 - MQTTAsync_setTraceCallback(nullptr); 168 + MQTTAsync_setTraceCallback( nullptr );
164 } 169 }
165 170
166 - MQTTAsync_destroy(&m_client); 171 + MQTTAsync_destroy( &m_client );
167 172
168 m_callbackEventQueue.stop(); 173 m_callbackEventQueue.stop();
169 - if (m_workerThread.joinable()) 174 + if( m_workerThread.joinable() )
170 { 175 {
171 m_workerThread.join(); 176 m_workerThread.join();
172 } 177 }
@@ -193,14 +198,28 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -193,14 +198,28 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
193 setConnectionStatus( ConnectionStatus::ConnectInProgress ); 198 setConnectionStatus( ConnectionStatus::ConnectInProgress );
194 } 199 }
195 200
  201 + LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
  202 +
196 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; 203 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
197 - conn_opts.keepAliveInterval = 20; 204 + conn_opts.keepAliveInterval = 5;
198 conn_opts.cleansession = 1; 205 conn_opts.cleansession = 1;
199 - conn_opts.onSuccess = &ClientPaho::onConnectSuccess; 206 + conn_opts.onSuccess = nullptr;
200 conn_opts.onFailure = &ClientPaho::onConnectFailure; 207 conn_opts.onFailure = &ClientPaho::onConnectFailure;
201 conn_opts.context = this; 208 conn_opts.context = this;
202 conn_opts.automaticReconnect = 1; 209 conn_opts.automaticReconnect = 1;
203 210
  211 + // Make sure we get a signal if the promise is fulfilled
  212 + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
  213 + if( MQTTASYNC_SUCCESS == ccb )
  214 + {
  215 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
  216 + }
  217 + else
  218 + {
  219 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
  220 + }
  221 +
  222 + // Setup the last will and testament, if so desired.
204 if( !lwt.topic().empty() ) 223 if( !lwt.topic().empty() )
205 { 224 {
206 MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; 225 MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
@@ -208,13 +227,14 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -208,13 +227,14 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
208 will_opts.topicName = lwt.topic().c_str(); 227 will_opts.topicName = lwt.topic().c_str();
209 228
210 conn_opts.will = &will_opts; 229 conn_opts.will = &will_opts;
  230 +
  231 + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
211 } 232 }
212 else 233 else
213 { 234 {
214 conn_opts.will = nullptr; 235 conn_opts.will = nullptr;
215 } 236 }
216 237
217 -  
218 if( !m_username.empty() ) 238 if( !m_username.empty() )
219 { 239 {
220 conn_opts.username = m_username.c_str(); 240 conn_opts.username = m_username.c_str();
@@ -259,18 +279,19 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt ) @@ -259,18 +279,19 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &amp;lwt )
259 return -100; 279 return -100;
260 } 280 }
261 281
262 -std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) 282 +std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
263 { 283 {
264 ConnectionStatus currentStatus = m_connectionStatus; 284 ConnectionStatus currentStatus = m_connectionStatus;
265 285
266 { 286 {
267 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 287 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
268 - if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) { 288 + if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
  289 + {
269 return -1; 290 return -1;
270 } 291 }
271 292
272 currentStatus = m_connectionStatus; 293 currentStatus = m_connectionStatus;
273 - setConnectionStatus(ConnectionStatus::DisconnectInProgress); 294 + setConnectionStatus( ConnectionStatus::DisconnectInProgress );
274 } 295 }
275 296
276 MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize(); 297 MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
@@ -282,43 +303,45 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) @@ -282,43 +303,45 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
282 std::promise<void> waitForDisconnectPromise{}; 303 std::promise<void> waitForDisconnectPromise{};
283 auto waitForDisconnect = waitForDisconnectPromise.get_future(); 304 auto waitForDisconnect = waitForDisconnectPromise.get_future();
284 m_disconnectPromise.reset(); 305 m_disconnectPromise.reset();
285 - if (wait) { 306 + if( wait )
  307 + {
286 m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise)); 308 m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
287 } 309 }
288 310
289 { 311 {
290 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 312 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
291 - if (!m_pendingOperations.insert(-200).second) 313 + if( !m_pendingOperations.insert( -200 ).second )
292 { 314 {
293 - // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) 315 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
294 } 316 }
295 - m_operationResult.erase(-200); 317 + m_operationResult.erase( -200 );
296 } 318 }
297 319
298 - int rc = MQTTAsync_disconnect(m_client, &disconn_opts);  
299 - if (MQTTASYNC_SUCCESS != rc) {  
300 - if (MQTTASYNC_DISCONNECTED == rc) { 320 + int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
  321 + if( MQTTASYNC_SUCCESS != rc )
  322 + {
  323 + if( MQTTASYNC_DISCONNECTED == rc )
  324 + {
301 currentStatus = ConnectionStatus::Disconnected; 325 currentStatus = ConnectionStatus::Disconnected;
302 } 326 }
303 327
304 - setConnectionStatus(currentStatus);  
305 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 328 + setConnectionStatus( currentStatus );
  329 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
306 m_operationResult[-200] = false; 330 m_operationResult[-200] = false;
307 - m_pendingOperations.erase(-200); 331 + m_pendingOperations.erase( -200 );
308 332
309 - if (MQTTASYNC_DISCONNECTED == rc) 333 + if( MQTTASYNC_DISCONNECTED == rc )
310 { 334 {
311 return -1; 335 return -1;
312 } 336 }
313 - // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); 337 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
314 } 338 }
315 339
316 if( wait ) 340 if( wait )
317 { 341 {
318 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) 342 if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
319 { 343 {
320 - // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId);  
321 - 344 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
322 } 345 }
323 waitForDisconnect.get(); 346 waitForDisconnect.get();
324 m_disconnectPromise.reset(); 347 m_disconnectPromise.reset();
@@ -326,67 +349,67 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) @@ -326,67 +349,67 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs)
326 return -200; 349 return -200;
327 } 350 }
328 351
329 -std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) 352 +std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
330 { 353 {
331 - if (ConnectionStatus::DisconnectInProgress == m_connectionStatus) 354 + if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
332 { 355 {
333 - // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); 356 + LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
334 return -1; 357 return -1;
335 } 358 }
336 - else if (ConnectionStatus::Disconnected == m_connectionStatus) 359 + else if( ConnectionStatus::Disconnected == m_connectionStatus )
337 { 360 {
338 - // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); 361 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
339 connect( true ); 362 connect( true );
340 } 363 }
341 364
342 - if (!isValidTopic(message.topic())) 365 + if( !isValidTopic(message.topic() ) )
343 { 366 {
344 - // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); 367 + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
345 } 368 }
346 369
347 - if (qos > 2) 370 + if( qos > 2 )
348 { 371 {
349 qos = 2; 372 qos = 2;
350 } 373 }
351 - else if (qos < 0) 374 + else if( qos < 0 )
352 { 375 {
353 qos = 0; 376 qos = 0;
354 } 377 }
355 378
356 -  
357 std::unique_lock<std::mutex> lck(m_mutex); 379 std::unique_lock<std::mutex> lck(m_mutex);
358 - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) { 380 + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes )
  381 + {
359 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); 382 m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
360 - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) {  
361 - // ("ClientPaho", "Adding publish to pending queue.");  
362 - m_pendingPublishes.push_front(Publish{ qos, message }); 383 + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus )
  384 + {
  385 + LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
  386 + m_pendingPublishes.push_front( Publish{ qos, message } );
363 return -1; 387 return -1;
364 } 388 }
365 } 389 }
366 390
367 - return publishInternal(message, qos); 391 + return publishInternal( message, qos );
368 } 392 }
369 393
370 void ClientPaho::publishPending() 394 void ClientPaho::publishPending()
371 { 395 {
372 { 396 {
373 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 397 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
374 - if (!m_processPendingPublishes) 398 + if( !m_processPendingPublishes )
375 { 399 {
376 return; 400 return;
377 } 401 }
378 } 402 }
379 403
380 - if (ConnectionStatus::Connected != m_connectionStatus) 404 + if( ConnectionStatus::Connected != m_connectionStatus )
381 { 405 {
382 - // MqttException, "Not connected"); 406 + LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
383 } 407 }
384 408
385 - while (!m_pendingPublishes.empty()) 409 + while( !m_pendingPublishes.empty() )
386 { 410 {
387 const auto& pub = m_pendingPublishes.back(); 411 const auto& pub = m_pendingPublishes.back();
388 - publishInternal(pub.data, pub.qos);  
389 - // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what()); 412 + publishInternal( pub.data, pub.qos );
390 413
391 m_pendingPublishes.pop_back(); 414 m_pendingPublishes.pop_back();
392 } 415 }
@@ -398,23 +421,25 @@ void ClientPaho::publishPending() @@ -398,23 +421,25 @@ void ClientPaho::publishPending()
398 m_pendingPublishesReadyCV.notify_all(); 421 m_pendingPublishesReadyCV.notify_all();
399 } 422 }
400 423
401 -std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb) 424 +std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
402 { 425 {
403 - if (ConnectionStatus::Connected != m_connectionStatus) 426 + if( ConnectionStatus::Connected != m_connectionStatus )
404 { 427 {
405 - // MqttException, "Not connected" 428 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
406 } 429 }
407 430
408 - if (!isValidTopic(topic)) 431 + if( !isValidTopic( topic ) )
409 { 432 {
410 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); 433 // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
  434 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
  435 + return -1;
411 } 436 }
412 437
413 - if (qos > 2) 438 + if( qos > 2 )
414 { 439 {
415 qos = 2; 440 qos = 2;
416 } 441 }
417 - else if (qos < 0) 442 + else if( qos < 0 )
418 { 443 {
419 qos = 0; 444 qos = 0;
420 } 445 }
@@ -423,21 +448,27 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std: @@ -423,21 +448,27 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std:
423 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 448 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
424 449
425 auto itExisting = m_subscriptions.find(topic); 450 auto itExisting = m_subscriptions.find(topic);
426 - if (m_subscriptions.end() != itExisting) {  
427 - if (itExisting->second.qos == qos) { 451 + if( m_subscriptions.end() != itExisting )
  452 + {
  453 + if( itExisting->second.qos == qos )
  454 + {
428 return -1; 455 return -1;
429 } 456 }
430 // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic); 457 // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
431 } 458 }
432 459
433 auto itPending = m_pendingSubscriptions.find(topic); 460 auto itPending = m_pendingSubscriptions.find(topic);
434 - if (m_pendingSubscriptions.end() != itPending) {  
435 - if (itPending->second.qos == qos) {  
436 - auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; });  
437 - if (m_subscribeTokenToTopic.end() != itToken) { 461 + if( m_pendingSubscriptions.end() != itPending )
  462 + {
  463 + if( itPending->second.qos == qos )
  464 + {
  465 + auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
  466 + if( m_subscribeTokenToTopic.end() != itToken )
  467 + {
438 return itToken->first; 468 return itToken->first;
439 } 469 }
440 - else { 470 + else
  471 + {
441 return -1; 472 return -1;
442 } 473 }
443 } 474 }
@@ -445,45 +476,46 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std: @@ -445,45 +476,46 @@ std::int32_t ClientPaho::subscribe(const std::string&amp; topic, int qos, const std:
445 } 476 }
446 477
447 std::string existingTopic{}; 478 std::string existingTopic{};
448 - if (isOverlappingInternal(topic, existingTopic)) 479 + if( isOverlappingInternal( topic, existingTopic ) )
449 { 480 {
450 // (OverlappingTopicException, "overlapping topic", existingTopic, topic); 481 // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
  482 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
451 } 483 }
452 484
453 - // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic);  
454 - m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb })); 485 + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
  486 + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
455 } 487 }
456 - return subscribeInternal(topic, qos); 488 + return subscribeInternal( topic, qos );
457 } 489 }
458 490
459 void ClientPaho::resubscribe() 491 void ClientPaho::resubscribe()
460 { 492 {
461 - decltype(m_pendingSubscriptions) pendingSubscriptions{}; 493 + decltype( m_pendingSubscriptions ) pendingSubscriptions{};
462 { 494 {
463 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 495 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
464 - std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); 496 + std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
465 } 497 }
466 498
467 - for (const auto& s : pendingSubscriptions) 499 + for( const auto& s : pendingSubscriptions )
468 { 500 {
469 - subscribeInternal(s.first, s.second.qos); 501 + subscribeInternal( s.first, s.second.qos );
470 } 502 }
471 } 503 }
472 504
473 std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) 505 std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
474 { 506 {
475 { 507 {
476 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 508 + OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
477 bool found = false; 509 bool found = false;
478 - for (const auto& s : m_subscriptions) 510 + for( const auto& s : m_subscriptions )
479 { 511 {
480 - if (topic == s.first && qos == s.second.qos) 512 + if( topic == s.first && qos == s.second.qos )
481 { 513 {
482 found = true; 514 found = true;
483 break; 515 break;
484 } 516 }
485 } 517 }
486 - if (!found) 518 + if( !found )
487 { 519 {
488 return -1; 520 return -1;
489 } 521 }
@@ -498,21 +530,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos ) @@ -498,21 +530,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
498 // Need to lock the mutex because it is possible that the callback is faster than 530 // Need to lock the mutex because it is possible that the callback is faster than
499 // the insertion of the token into the pending operations. 531 // the insertion of the token into the pending operations.
500 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 532 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
501 - auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts);  
502 - if (MQTTASYNC_SUCCESS != rc) 533 + auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
  534 + if( MQTTASYNC_SUCCESS != rc )
503 { 535 {
504 - // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); 536 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
505 } 537 }
506 538
507 - if (!m_pendingOperations.insert(opts.token).second) 539 + if( !m_pendingOperations.insert( opts.token ).second )
508 { 540 {
509 - // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); 541 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) );
510 } 542 }
511 543
512 - m_operationResult.erase(opts.token);  
513 - if (m_unsubscribeTokenToTopic.count(opts.token) > 0) 544 + m_operationResult.erase( opts.token );
  545 + if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
514 { 546 {
515 - // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); 547 + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
516 } 548 }
517 m_lastUnsubscribe = opts.token; // centos7 workaround 549 m_lastUnsubscribe = opts.token; // centos7 workaround
518 m_unsubscribeTokenToTopic[opts.token] = topic; 550 m_unsubscribeTokenToTopic[opts.token] = topic;
@@ -526,39 +558,42 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos ) @@ -526,39 +558,42 @@ std::int32_t ClientPaho::unsubscribe( const std::string&amp; topic, int qos )
526 558
527 void ClientPaho::unsubscribeAll() 559 void ClientPaho::unsubscribeAll()
528 { 560 {
529 - decltype(m_subscriptions) subscriptions{}; 561 + decltype( m_subscriptions ) subscriptions{};
530 { 562 {
531 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 563 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
532 subscriptions = m_subscriptions; 564 subscriptions = m_subscriptions;
533 } 565 }
534 566
535 - for (const auto& s : subscriptions) {  
536 - this->unsubscribe(s.first, s.second.qos); 567 + for( const auto& s : subscriptions )
  568 + {
  569 + this->unsubscribe( s.first, s.second.qos );
537 } 570 }
538 } 571 }
539 572
540 std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const 573 std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
541 { 574 {
542 - if (waitFor <= std::chrono::milliseconds(0)) {  
543 - return std::chrono::milliseconds(0); 575 + if( waitFor <= std::chrono::milliseconds( 0 ) )
  576 + {
  577 + return std::chrono::milliseconds( 0 );
544 } 578 }
545 std::chrono::milliseconds timeElapsed{}; 579 std::chrono::milliseconds timeElapsed{};
546 { 580 {
547 - osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds) 581 + osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
548 { 582 {
549 - timeElapsed = std::chrono::ceil<std::chrono::milliseconds>(sinceStart); 583 + timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
550 }); 584 });
551 std::unique_lock<std::mutex> lck(m_mutex); 585 std::unique_lock<std::mutex> lck(m_mutex);
  586 +
552 // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); 587 // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
553 m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() 588 m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
554 { 589 {
555 - if (tokens.empty()) 590 + if( tokens.empty() )
556 { // wait for all operations to end 591 { // wait for all operations to end
557 return m_pendingOperations.empty(); 592 return m_pendingOperations.empty();
558 } 593 }
559 - else if (tokens.size() == 1) 594 + else if( tokens.size() == 1 )
560 { 595 {
561 - return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end(); 596 + return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
562 } 597 }
563 std::vector<std::int32_t> intersect{}; 598 std::vector<std::int32_t> intersect{};
564 std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); 599 std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
@@ -568,16 +603,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond @@ -568,16 +603,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond
568 return timeElapsed; 603 return timeElapsed;
569 } 604 }
570 605
571 -bool ClientPaho::isOverlapping(const std::string& topic) const 606 +bool ClientPaho::isOverlapping( const std::string& topic ) const
572 { 607 {
573 std::string existingTopic{}; 608 std::string existingTopic{};
574 - return isOverlapping(topic, existingTopic); 609 + return isOverlapping( topic, existingTopic );
575 } 610 }
576 611
577 -bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const 612 +bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
578 { 613 {
579 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 614 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
580 - return isOverlappingInternal(topic, existingTopic); 615 + return isOverlappingInternal( topic, existingTopic );
581 } 616 }
582 617
583 std::vector<std::int32_t> ClientPaho::pendingOperations() const 618 std::vector<std::int32_t> ClientPaho::pendingOperations() const
@@ -585,7 +620,7 @@ std::vector&lt;std::int32_t&gt; ClientPaho::pendingOperations() const @@ -585,7 +620,7 @@ std::vector&lt;std::int32_t&gt; ClientPaho::pendingOperations() const
585 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 620 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
586 std::vector<std::int32_t> retval{}; 621 std::vector<std::int32_t> retval{};
587 retval.resize(m_pendingOperations.size()); 622 retval.resize(m_pendingOperations.size());
588 - std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin()); 623 + std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
589 return retval; 624 return retval;
590 } 625 }
591 626
@@ -595,36 +630,36 @@ bool ClientPaho::hasPendingSubscriptions() const @@ -595,36 +630,36 @@ bool ClientPaho::hasPendingSubscriptions() const
595 return !m_pendingSubscriptions.empty(); 630 return !m_pendingSubscriptions.empty();
596 } 631 }
597 632
598 -boost::optional<bool> ClientPaho::operationResult(std::int32_t token) const 633 +boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
599 { 634 {
600 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 635 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
601 boost::optional<bool> ret{}; 636 boost::optional<bool> ret{};
602 - auto cit = m_operationResult.find(token);  
603 - if (m_operationResult.end() != cit) 637 + auto cit = m_operationResult.find( token );
  638 + if( m_operationResult.end() != cit )
604 { 639 {
605 ret = cit->second; 640 ret = cit->second;
606 } 641 }
607 return ret; 642 return ret;
608 } 643 }
609 644
610 -void ClientPaho::parseEndpoint(const std::string& _endpoint) 645 +void ClientPaho::parseEndpoint( const std::string& _endpoint )
611 { 646 {
612 - auto ep = UriParser::parse(_endpoint);  
613 - if (ep.find("user") != ep.end()) 647 + auto ep = UriParser::parse( _endpoint );
  648 + if( ep.find( "user" ) != ep.end() )
614 { 649 {
615 m_username = ep["user"]; 650 m_username = ep["user"];
616 ep["user"].clear(); 651 ep["user"].clear();
617 } 652 }
618 653
619 - if (ep.find("password") != ep.end()) 654 + if( ep.find( "password" ) != ep.end() )
620 { 655 {
621 m_password = ep["password"]; 656 m_password = ep["password"];
622 ep["password"].clear(); 657 ep["password"].clear();
623 } 658 }
624 - m_endpoint = UriParser::toString(ep); 659 + m_endpoint = UriParser::toString( ep );
625 } 660 }
626 661
627 -std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) 662 +std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
628 { 663 {
629 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; 664 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
630 opts.onSuccess = &ClientPaho::onPublishSuccess; 665 opts.onSuccess = &ClientPaho::onPublishSuccess;
@@ -637,21 +672,21 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage&amp; message, int qos) @@ -637,21 +672,21 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage&amp; message, int qos)
637 // the insertion of the token into the pending operations. 672 // the insertion of the token into the pending operations.
638 673
639 // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 674 // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
640 - auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts);  
641 - if (MQTTASYNC_SUCCESS != rc) 675 + auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
  676 + if( MQTTASYNC_SUCCESS != rc )
642 { 677 {
643 - // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc)); 678 + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
644 } 679 }
645 680
646 - if (!m_pendingOperations.insert(opts.token).second) 681 + if( !m_pendingOperations.insert( opts.token ).second )
647 { 682 {
648 - // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token); 683 + // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
649 } 684 }
650 - m_operationResult.erase(opts.token); 685 + m_operationResult.erase( opts.token );
651 return opts.token; 686 return opts.token;
652 } 687 }
653 688
654 -std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) 689 +std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
655 { 690 {
656 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; 691 MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
657 opts.onSuccess = &ClientPaho::onSubscribeSuccess; 692 opts.onSuccess = &ClientPaho::onSubscribeSuccess;
@@ -661,52 +696,53 @@ std::int32_t ClientPaho::subscribeInternal(const std::string&amp; topic, int qos) @@ -661,52 +696,53 @@ std::int32_t ClientPaho::subscribeInternal(const std::string&amp; topic, int qos)
661 // Need to lock the mutex because it is possible that the callback is faster than 696 // Need to lock the mutex because it is possible that the callback is faster than
662 // the insertion of the token into the pending operations. 697 // the insertion of the token into the pending operations.
663 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 698 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
664 - auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts); 699 + auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
665 if (MQTTASYNC_SUCCESS != rc) 700 if (MQTTASYNC_SUCCESS != rc)
666 { 701 {
667 - m_pendingSubscriptions.erase(topic);  
668 - // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc));  
669 - // (MqttException, "Subscription failed"); 702 + m_pendingSubscriptions.erase( topic );
  703 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
670 } 704 }
671 705
672 - if (!m_pendingOperations.insert(opts.token).second) 706 + if( !m_pendingOperations.insert( opts.token ).second )
673 { 707 {
674 - // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token); 708 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
675 } 709 }
676 - m_operationResult.erase(opts.token);  
677 - if (m_subscribeTokenToTopic.count(opts.token) > 0) 710 + m_operationResult.erase( opts.token );
  711 + if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
678 { 712 {
679 - // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic); 713 + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
680 } 714 }
681 m_subscribeTokenToTopic[opts.token] = topic; 715 m_subscribeTokenToTopic[opts.token] = topic;
682 return opts.token; 716 return opts.token;
683 } 717 }
684 718
685 -void ClientPaho::setConnectionStatus(ConnectionStatus status) 719 +void ClientPaho::setConnectionStatus( ConnectionStatus status )
686 { 720 {
  721 + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) );
687 ConnectionStatus curStatus = m_connectionStatus; 722 ConnectionStatus curStatus = m_connectionStatus;
688 m_connectionStatus = status; 723 m_connectionStatus = status;
689 - if (status != curStatus && m_connectionStatusCallback) 724 + if( status != curStatus && m_connectionStatusCallback )
690 { 725 {
691 - m_connectionStatusCallback(m_clientId, status); 726 + LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) );
  727 + m_connectionStatusCallback( m_clientId, status );
692 } 728 }
693 } 729 }
694 730
695 -bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const 731 +bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
696 { 732 {
697 existingTopic.clear(); 733 existingTopic.clear();
698 - for (const auto& s : m_pendingSubscriptions) 734 + for( const auto& s : m_pendingSubscriptions )
699 { 735 {
700 - if (testForOverlap(s.first, topic)) 736 + if( testForOverlap( s.first, topic ) )
701 { 737 {
702 existingTopic = s.first; 738 existingTopic = s.first;
703 return true; 739 return true;
704 } 740 }
705 } 741 }
706 742
707 - for (const auto& s : m_subscriptions) 743 + for( const auto& s : m_subscriptions )
708 { 744 {
709 - if (testForOverlap(s.first, topic)) 745 + if( testForOverlap( s.first, topic ) )
710 { 746 {
711 existingTopic = s.first; 747 existingTopic = s.first;
712 return true; 748 return true;
@@ -715,87 +751,88 @@ bool ClientPaho::isOverlappingInternal(const std::string&amp; topic, std::string&amp; ex @@ -715,87 +751,88 @@ bool ClientPaho::isOverlappingInternal(const std::string&amp; topic, std::string&amp; ex
715 return false; 751 return false;
716 } 752 }
717 753
718 -void ClientPaho::pushIncomingEvent(std::function<void()> ev) 754 +void ClientPaho::pushIncomingEvent( std::function<void()> ev )
719 { 755 {
720 - m_callbackEventQueue.push(ev); 756 + m_callbackEventQueue.push( ev );
721 } 757 }
722 758
723 void ClientPaho::callbackEventHandler() 759 void ClientPaho::callbackEventHandler()
724 { 760 {
725 - // ("ClientPaho", "%1 - starting callback event handler", m_clientId);  
726 - for (;;) { 761 + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
  762 + for( ;; )
  763 + {
727 std::vector<std::function<void()>> events; 764 std::vector<std::function<void()>> events;
728 - if (!m_callbackEventQueue.pop(events)) 765 + if( !m_callbackEventQueue.pop(events) )
729 { 766 {
730 break; 767 break;
731 } 768 }
732 769
733 - for (const auto& ev : events) 770 + for( const auto& ev : events )
734 { 771 {
735 ev(); 772 ev();
736 - // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException);  
737 } 773 }
738 } 774 }
739 - // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); 775 + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
740 } 776 }
741 -  
742 -void ClientPaho::onConnectOnInstance(const std::string& cause) 777 +void ClientPaho::onConnectOnInstance( const std::string& cause )
743 { 778 {
744 - (void)cause;  
745 - // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause); 779 + (void) cause;
746 { 780 {
747 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 781 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
748 - std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); 782 + std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
749 m_subscriptions.clear(); 783 m_subscriptions.clear();
750 m_processPendingPublishes = true; // all publishes are on hold until publishPending is called. 784 m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
751 } 785 }
752 786
753 - setConnectionStatus(ConnectionStatus::Connected); 787 + setConnectionStatus( ConnectionStatus::Connected );
754 } 788 }
755 789
756 -void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) 790 +void ClientPaho::onConnectSuccessOnInstance()
757 { 791 {
758 - auto connectData = response.connectionData();  
759 - // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)",  
760 - // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent()); 792 + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]",
  793 + std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) );
761 { 794 {
762 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 795 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
763 // Register the connect callback that is used in reconnect scenarios. 796 // Register the connect callback that is used in reconnect scenarios.
764 - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect);  
765 - if (MQTTASYNC_SUCCESS != rc) 797 + auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
  798 + if( MQTTASYNC_SUCCESS != rc )
766 { 799 {
767 - // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); 800 + LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
768 } 801 }
  802 +
769 // For MQTTV5 803 // For MQTTV5
770 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); 804 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
771 //if (MQTTASYNC_SUCCESS != rc) { 805 //if (MQTTASYNC_SUCCESS != rc) {
772 // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); 806 // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
773 //} 807 //}
774 // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); 808 // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
  809 +
775 m_operationResult[-100] = true; 810 m_operationResult[-100] = true;
776 m_pendingOperations.erase(-100); 811 m_pendingOperations.erase(-100);
777 } 812 }
778 - setConnectionStatus(ConnectionStatus::Connected);  
779 - if (m_connectPromise) 813 +
  814 + setConnectionStatus( ConnectionStatus::Connected );
  815 + if( m_connectPromise )
780 { 816 {
  817 + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
781 m_connectPromise->set_value(); 818 m_connectPromise->set_value();
782 } 819 }
783 m_operationsCompleteCV.notify_all(); 820 m_operationsCompleteCV.notify_all();
784 } 821 }
785 822
786 -void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) 823 +void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
787 { 824 {
788 - (void)response;  
789 - // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); 825 + (void) response;
  826 + LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
790 { 827 {
791 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 828 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
792 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); 829 // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
793 m_operationResult[-100] = false; 830 m_operationResult[-100] = false;
794 m_pendingOperations.erase(-100); 831 m_pendingOperations.erase(-100);
795 } 832 }
796 - if (ConnectionStatus::ConnectInProgress == m_connectionStatus) 833 + if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
797 { 834 {
798 - setConnectionStatus(ConnectionStatus::Disconnected); 835 + setConnectionStatus( ConnectionStatus::Disconnected );
799 } 836 }
800 m_operationsCompleteCV.notify_all(); 837 m_operationsCompleteCV.notify_all();
801 } 838 }
@@ -805,9 +842,9 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response) @@ -805,9 +842,9 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure&amp; response)
805 // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); 842 // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
806 //} 843 //}
807 844
808 -void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) 845 +void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
809 { 846 {
810 - // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint); 847 + LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
811 { 848 {
812 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 849 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
813 m_subscriptions.clear(); 850 m_subscriptions.clear();
@@ -820,45 +857,46 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&amp;) @@ -820,45 +857,46 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&amp;)
820 m_pendingOperations.clear(); 857 m_pendingOperations.clear();
821 } 858 }
822 859
823 - setConnectionStatus(ConnectionStatus::Disconnected); 860 + setConnectionStatus( ConnectionStatus::Disconnected );
824 861
825 - if (m_disconnectPromise) { 862 + if( m_disconnectPromise )
  863 + {
826 m_disconnectPromise->set_value(); 864 m_disconnectPromise->set_value();
827 } 865 }
828 m_operationsCompleteCV.notify_all(); 866 m_operationsCompleteCV.notify_all();
829 } 867 }
830 868
831 -void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response) 869 +void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
832 { 870 {
833 - (void)response;  
834 - // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); 871 + (void) response;
  872 + LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
835 { 873 {
836 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 874 + OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
837 // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); 875 // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
838 m_operationResult[-200] = false; 876 m_operationResult[-200] = false;
839 m_pendingOperations.erase(-200); 877 m_pendingOperations.erase(-200);
840 } 878 }
841 879
842 - if (MQTTAsync_isConnected(m_client)) 880 + if( MQTTAsync_isConnected( m_client ) )
843 { 881 {
844 - setConnectionStatus(ConnectionStatus::Connected); 882 + setConnectionStatus( ConnectionStatus::Connected );
845 } 883 }
846 else 884 else
847 { 885 {
848 - setConnectionStatus(ConnectionStatus::Disconnected); 886 + setConnectionStatus( ConnectionStatus::Disconnected );
849 } 887 }
850 888
851 - if (m_disconnectPromise) 889 + if( m_disconnectPromise )
852 { 890 {
853 m_disconnectPromise->set_value(); 891 m_disconnectPromise->set_value();
854 } 892 }
855 m_operationsCompleteCV.notify_all(); 893 m_operationsCompleteCV.notify_all();
856 } 894 }
857 895
858 -void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) 896 +void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
859 { 897 {
860 auto pd = response.publishData(); 898 auto pd = response.publishData();
861 - // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); 899 + // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
862 { 900 {
863 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 901 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
864 // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 902 // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
@@ -868,9 +906,9 @@ void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess&amp; response) @@ -868,9 +906,9 @@ void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess&amp; response)
868 m_operationsCompleteCV.notify_all(); 906 m_operationsCompleteCV.notify_all();
869 } 907 }
870 908
871 -void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) 909 +void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
872 { 910 {
873 - // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); 911 + LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
874 { 912 {
875 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 913 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
876 // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 914 // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
@@ -880,78 +918,82 @@ void ClientPaho::onPublishFailureOnInstance(const MqttFailure&amp; response) @@ -880,78 +918,82 @@ void ClientPaho::onPublishFailureOnInstance(const MqttFailure&amp; response)
880 m_operationsCompleteCV.notify_all(); 918 m_operationsCompleteCV.notify_all();
881 } 919 }
882 920
883 -void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response) 921 +void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
884 { 922 {
885 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token());  
886 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 923 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  924 +
  925 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
887 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 926 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
888 bool operationOk = false; 927 bool operationOk = false;
889 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]() 928 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
890 { 929 {
891 // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 930 // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
892 m_operationResult[response.token()] = operationOk; 931 m_operationResult[response.token()] = operationOk;
893 - m_pendingOperations.erase(response.token()); 932 + m_pendingOperations.erase( response.token() );
894 }); 933 });
895 - auto it = m_subscribeTokenToTopic.find(response.token());  
896 - if (m_subscribeTokenToTopic.end() == it) {  
897 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); 934 + auto it = m_subscribeTokenToTopic.find( response.token() );
  935 + if( m_subscribeTokenToTopic.end() == it )
  936 + {
  937 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
898 return; 938 return;
899 } 939 }
900 auto topic = it->second; 940 auto topic = it->second;
901 m_subscribeTokenToTopic.erase(it); 941 m_subscribeTokenToTopic.erase(it);
902 942
903 - auto pendingIt = m_pendingSubscriptions.find(topic);  
904 - if (m_pendingSubscriptions.end() == pendingIt) 943 + auto pendingIt = m_pendingSubscriptions.find( topic );
  944 + if( m_pendingSubscriptions.end() == pendingIt )
905 { 945 {
906 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); 946 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
907 return; 947 return;
908 } 948 }
909 - if (response.qos() != pendingIt->second.qos) 949 + if( response.qos() != pendingIt->second.qos )
910 { 950 {
911 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos()); 951 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
912 } 952 }
913 - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic);  
914 - m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second)));  
915 - m_pendingSubscriptions.erase(pendingIt); 953 +
  954 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
  955 + m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
  956 + m_pendingSubscriptions.erase( pendingIt );
916 operationOk = true; 957 operationOk = true;
917 } 958 }
918 959
919 -void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response) 960 +void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
920 { 961 {
921 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());  
922 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 962 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  963 +
  964 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
923 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 965 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
924 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() 966 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
925 { 967 {
926 // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 968 // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
927 m_operationResult[response.token()] = false; 969 m_operationResult[response.token()] = false;
928 - m_pendingOperations.erase(response.token()); 970 + m_pendingOperations.erase( response.token() );
929 }); 971 });
930 972
931 - auto it = m_subscribeTokenToTopic.find(response.token());  
932 - if (m_subscribeTokenToTopic.end() == it) 973 + auto it = m_subscribeTokenToTopic.find( response.token() );
  974 + if( m_subscribeTokenToTopic.end() == it )
933 { 975 {
934 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); 976 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
935 return; 977 return;
936 } 978 }
937 auto topic = it->second; 979 auto topic = it->second;
938 - m_subscribeTokenToTopic.erase(it); 980 + m_subscribeTokenToTopic.erase( it );
939 981
940 - auto pendingIt = m_pendingSubscriptions.find(topic);  
941 - if (m_pendingSubscriptions.end() == pendingIt) 982 + auto pendingIt = m_pendingSubscriptions.find( topic );
  983 + if( m_pendingSubscriptions.end() == pendingIt )
942 { 984 {
943 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); 985 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
944 return; 986 return;
945 } 987 }
946 - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic);  
947 - m_pendingSubscriptions.erase(pendingIt); 988 + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
  989 + m_pendingSubscriptions.erase( pendingIt );
948 } 990 }
949 991
950 -void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) 992 +void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
951 { 993 {
952 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token()); 994 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
953 995
954 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 996 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
955 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 997 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
956 998
957 // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. 999 // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
@@ -960,116 +1002,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess&amp; response) @@ -960,116 +1002,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess&amp; response)
960 // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled 1002 // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
961 // sequentially (see ClientPaho::unsubscribe)! 1003 // sequentially (see ClientPaho::unsubscribe)!
962 auto token = response.token(); 1004 auto token = response.token();
963 - if (-1 == token) 1005 + if( -1 == token )
964 { 1006 {
965 token = m_lastUnsubscribe; 1007 token = m_lastUnsubscribe;
966 m_lastUnsubscribe = -1; 1008 m_lastUnsubscribe = -1;
967 } 1009 }
968 1010
969 bool operationOk = false; 1011 bool operationOk = false;
970 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]() 1012 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
971 { 1013 {
972 // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); 1014 // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
973 m_operationResult[token] = operationOk; 1015 m_operationResult[token] = operationOk;
974 - m_pendingOperations.erase(token); 1016 + m_pendingOperations.erase( token );
975 }); 1017 });
976 1018
977 - auto it = m_unsubscribeTokenToTopic.find(token);  
978 - if (m_unsubscribeTokenToTopic.end() == it) 1019 + auto it = m_unsubscribeTokenToTopic.find( token );
  1020 + if( m_unsubscribeTokenToTopic.end() == it )
979 { 1021 {
980 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token); 1022 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
981 return; 1023 return;
982 } 1024 }
983 auto topic = it->second; 1025 auto topic = it->second;
984 - m_unsubscribeTokenToTopic.erase(it); 1026 + m_unsubscribeTokenToTopic.erase( it );
985 1027
986 - auto registeredIt = m_subscriptions.find(topic);  
987 - if (m_subscriptions.end() == registeredIt) {  
988 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token()); 1028 + auto registeredIt = m_subscriptions.find( topic );
  1029 + if( m_subscriptions.end() == registeredIt )
  1030 + {
  1031 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
989 return; 1032 return;
990 } 1033 }
991 - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic);  
992 - m_subscriptions.erase(registeredIt); 1034 +
  1035 + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
  1036 + m_subscriptions.erase( registeredIt );
993 operationOk = true; 1037 operationOk = true;
994 } 1038 }
995 1039
996 -void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response) 1040 +void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
997 { 1041 {
998 - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message());  
999 - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); 1042 + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  1043 + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
1000 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 1044 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1001 - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() 1045 + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
1002 { 1046 {
1003 // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); 1047 // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
1004 m_operationResult[response.token()] = false; 1048 m_operationResult[response.token()] = false;
1005 - m_pendingOperations.erase(response.token()); 1049 + m_pendingOperations.erase( response.token() );
1006 }); 1050 });
1007 1051
1008 - auto it = m_unsubscribeTokenToTopic.find(response.token());  
1009 - if (m_unsubscribeTokenToTopic.end() == it) 1052 + auto it = m_unsubscribeTokenToTopic.find( response.token() );
  1053 + if( m_unsubscribeTokenToTopic.end() == it )
1010 { 1054 {
1011 - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); 1055 + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
1012 return; 1056 return;
1013 } 1057 }
1014 auto topic = it->second; 1058 auto topic = it->second;
1015 - m_unsubscribeTokenToTopic.erase(it); 1059 + m_unsubscribeTokenToTopic.erase( it );
1016 } 1060 }
1017 1061
1018 -int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message) 1062 +int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
1019 { 1063 {
1020 - // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate());  
1021 -  
1022 - std::function<void(MqttMessage)> cb; 1064 + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
1023 1065
  1066 + std::function<void( MqttMessage )> cb;
1024 { 1067 {
1025 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 1068 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1026 - for (const auto& s : m_subscriptions) 1069 + for( const auto& s : m_subscriptions )
1027 { 1070 {
1028 - if (boost::regex_match(message.topic(), s.second.topicRegex)) 1071 + if( boost::regex_match( message.topic(), s.second.topicRegex ) )
1029 { 1072 {
1030 cb = s.second.callback; 1073 cb = s.second.callback;
1031 } 1074 }
1032 } 1075 }
1033 } 1076 }
1034 1077
1035 - if (cb) 1078 + if( cb )
1036 { 1079 {
1037 - cb(message); 1080 + cb( message );
1038 } 1081 }
1039 else 1082 else
1040 { 1083 {
1041 - // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic()); 1084 + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
1042 } 1085 }
1043 return 1; 1086 return 1;
1044 } 1087 }
1045 1088
1046 -void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token) 1089 +void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
1047 { 1090 {
1048 - // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token);  
1049 - if (m_deliveryCompleteCallback) 1091 + LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
  1092 + if( m_deliveryCompleteCallback )
1050 { 1093 {
1051 - m_deliveryCompleteCallback(m_clientId, static_cast<std::int32_t>(token)); 1094 + m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
1052 } 1095 }
1053 } 1096 }
1054 1097
1055 -void ClientPaho::onConnectionLostOnInstance(const std::string& cause) 1098 +void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
1056 { 1099 {
1057 - (void)cause; 1100 + (void) cause;
1058 // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); 1101 // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
1059 - setConnectionStatus(ConnectionStatus::ReconnectInProgress); 1102 + setConnectionStatus( ConnectionStatus::ReconnectInProgress );
1060 1103
1061 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 1104 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
1062 // Remove all tokens related to subscriptions from the active operations. 1105 // Remove all tokens related to subscriptions from the active operations.
1063 - for (const auto& p : m_subscribeTokenToTopic) 1106 + for( const auto& p : m_subscribeTokenToTopic )
1064 { 1107 {
1065 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); 1108 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
1066 - m_pendingOperations.erase(p.first); 1109 + m_pendingOperations.erase( p.first );
1067 } 1110 }
1068 1111
1069 - for (const auto& p : m_unsubscribeTokenToTopic) 1112 + for( const auto& p : m_unsubscribeTokenToTopic )
1070 { 1113 {
1071 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); 1114 // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
1072 - m_pendingOperations.erase(p.first); 1115 + m_pendingOperations.erase( p.first );
1073 } 1116 }
1074 // Clear the administration used in the subscribe process. 1117 // Clear the administration used in the subscribe process.
1075 m_subscribeTokenToTopic.clear(); 1118 m_subscribeTokenToTopic.clear();
@@ -1077,39 +1120,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string&amp; cause) @@ -1077,39 +1120,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string&amp; cause)
1077 } 1120 }
1078 1121
1079 // static 1122 // static
1080 -void ClientPaho::onConnect(void* context, char* cause) 1123 +void ClientPaho::onFirstConnect( void* context, char* cause )
1081 { 1124 {
1082 - if (context) 1125 + LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
  1126 + if( context )
1083 { 1127 {
1084 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1085 - std::string reason(nullptr == cause ? "unknown cause" : cause);  
1086 - cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); }); 1128 + auto *cl = reinterpret_cast<ClientPaho*>( context );
  1129 + std::string reason( nullptr == cause ? "Unknown cause" : cause );
  1130 + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
  1131 + }
  1132 +}
  1133 +
  1134 +void ClientPaho::onConnect( void* context, char* cause )
  1135 +{
  1136 + LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
  1137 + if( context )
  1138 + {
  1139 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1140 + std::string reason( nullptr == cause ? "unknown cause" : cause );
  1141 + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
1087 } 1142 }
1088 } 1143 }
1089 1144
1090 // static 1145 // static
1091 -void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) 1146 +void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
1092 { 1147 {
1093 - if (context) 1148 + LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
  1149 + if( context )
1094 { 1150 {
1095 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1096 - if (!response) { 1151 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1152 + if( !response )
  1153 + {
1097 // connect should always have a valid response struct. 1154 // connect should always have a valid response struct.
1098 - // ("ClientPaho", "onConnectSuccess - no response data"); 1155 + LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
  1156 + return;
1099 } 1157 }
1100 - MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));  
1101 - cl->pushIncomingEvent([cl, resp]() { cl->onConnectSuccessOnInstance(resp); }); 1158 + // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
  1159 + cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
1102 } 1160 }
1103 } 1161 }
1104 1162
1105 // static 1163 // static
1106 -void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) 1164 +void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
1107 { 1165 {
1108 - if (context) 1166 + LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
  1167 + if( context )
1109 { 1168 {
1110 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1111 - MqttFailure resp(response);  
1112 - cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); }); 1169 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1170 + MqttFailure resp( response );
  1171 + cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
1113 } 1172 }
1114 } 1173 }
1115 1174
@@ -1134,34 +1193,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response @@ -1134,34 +1193,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response
1134 //} 1193 //}
1135 1194
1136 // static 1195 // static
1137 -void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response) 1196 +void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
1138 { 1197 {
1139 - if (context) 1198 + if( context )
1140 { 1199 {
1141 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1142 - MqttSuccess resp(response ? response->token : 0);  
1143 - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); }); 1200 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1201 + MqttSuccess resp( response ? response->token : 0 );
  1202 + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
1144 } 1203 }
1145 } 1204 }
1146 1205
1147 // static 1206 // static
1148 -void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) 1207 +void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
1149 { 1208 {
1150 - if (context) 1209 + LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
  1210 + if( context )
1151 { 1211 {
1152 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1153 - MqttFailure resp(response);  
1154 - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); }); 1212 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1213 + MqttFailure resp( response );
  1214 + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
1155 } 1215 }
1156 } 1216 }
1157 1217
1158 // static 1218 // static
1159 -void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response) 1219 +void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
1160 { 1220 {
1161 - if (context) 1221 + if( context )
1162 { 1222 {
1163 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1164 - if (!response) 1223 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1224 + if( !response )
1165 { 1225 {
1166 // publish should always have a valid response struct. 1226 // publish should always have a valid response struct.
1167 // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); 1227 // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
@@ -1172,134 +1232,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response @@ -1172,134 +1232,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response
1172 } 1232 }
1173 1233
1174 // static 1234 // static
1175 -void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response) 1235 +void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
1176 { 1236 {
1177 - (void)response;  
1178 - if (context) 1237 + (void) response;
  1238 + if( context )
1179 { 1239 {
1180 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1181 - MqttFailure resp(response);  
1182 - cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); }); 1240 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1241 + MqttFailure resp( response );
  1242 + cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
1183 } 1243 }
1184 } 1244 }
1185 1245
1186 // static 1246 // static
1187 -void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response) 1247 +void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
1188 { 1248 {
1189 - if (context) 1249 + if( context )
1190 { 1250 {
1191 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1192 - if (!response) 1251 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1252 + if( !response )
1193 { 1253 {
1194 // subscribe should always have a valid response struct. 1254 // subscribe should always have a valid response struct.
1195 // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); 1255 // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
1196 } 1256 }
1197 - MqttSuccess resp(response->token, response->alt.qos);  
1198 - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); }); 1257 + MqttSuccess resp( response->token, response->alt.qos );
  1258 + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
1199 } 1259 }
1200 } 1260 }
1201 1261
1202 // static 1262 // static
1203 -void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response) 1263 +void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
1204 { 1264 {
1205 - if (context) 1265 + if( context )
1206 { 1266 {
1207 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1208 - MqttFailure resp(response);  
1209 - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); }); 1267 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1268 + MqttFailure resp( response );
  1269 + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
1210 } 1270 }
1211 } 1271 }
1212 1272
1213 // static 1273 // static
1214 -void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response) 1274 +void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
1215 { 1275 {
1216 - if (context) 1276 + if( context )
1217 { 1277 {
1218 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1219 - MqttSuccess resp(response ? response->token : -1);  
1220 - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); }); 1278 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1279 + MqttSuccess resp( response ? response->token : -1 );
  1280 + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
1221 } 1281 }
1222 } 1282 }
1223 1283
1224 // static 1284 // static
1225 -void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response) 1285 +void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
1226 { 1286 {
1227 - if (context) 1287 + if( context )
1228 { 1288 {
1229 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1230 - MqttFailure resp(response);  
1231 - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); }); 1289 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1290 + MqttFailure resp( response );
  1291 + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
1232 } 1292 }
1233 } 1293 }
1234 1294
1235 // static 1295 // static
1236 -int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message) 1296 +int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
1237 { 1297 {
1238 1298
1239 - OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]() 1299 + OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
1240 { 1300 {
1241 - MQTTAsync_freeMessage(&message);  
1242 - MQTTAsync_free(topicName); 1301 + MQTTAsync_freeMessage( &message );
  1302 + MQTTAsync_free( topicName );
1243 }); 1303 });
1244 1304
1245 - if (context) 1305 + if( context )
1246 { 1306 {
1247 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1248 - MqttMessage msg(topicName, *message);  
1249 - cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); }); 1307 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1308 + MqttMessage msg( topicName, *message );
  1309 + cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
1250 } 1310 }
1251 1311
1252 return 1; // always return true. Otherwise this callback is triggered again. 1312 return 1; // always return true. Otherwise this callback is triggered again.
1253 } 1313 }
1254 1314
1255 // static 1315 // static
1256 -void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token) 1316 +void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
1257 { 1317 {
1258 - if (context) 1318 + if( context )
1259 { 1319 {
1260 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1261 - cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); }); 1320 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1321 + cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
1262 } 1322 }
1263 } 1323 }
1264 1324
1265 // static 1325 // static
1266 -void ClientPaho::onConnectionLost(void* context, char* cause) 1326 +void ClientPaho::onConnectionLost( void* context, char* cause )
1267 { 1327 {
1268 - OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]() 1328 + OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
1269 { 1329 {
1270 - if (cause) 1330 + if( cause )
1271 { 1331 {
1272 - MQTTAsync_free(cause); 1332 + MQTTAsync_free( cause );
1273 } 1333 }
1274 }); 1334 });
1275 1335
1276 - if (context) 1336 + if( context )
1277 { 1337 {
1278 - auto* cl = reinterpret_cast<ClientPaho*>(context);  
1279 - std::string msg(nullptr == cause ? "cause unknown" : cause);  
1280 - cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); }); 1338 + auto* cl = reinterpret_cast<ClientPaho*>( context );
  1339 + std::string msg( nullptr == cause ? "cause unknown" : cause );
  1340 + cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
1281 } 1341 }
1282 } 1342 }
1283 1343
1284 // static 1344 // static
1285 -void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message) 1345 +void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
1286 { 1346 {
1287 - (void)message;  
1288 - switch (level) 1347 + (void) message;
  1348 + switch( level )
1289 { 1349 {
1290 case MQTTASYNC_TRACE_MAXIMUM: 1350 case MQTTASYNC_TRACE_MAXIMUM:
1291 case MQTTASYNC_TRACE_MEDIUM: 1351 case MQTTASYNC_TRACE_MEDIUM:
1292 - case MQTTASYNC_TRACE_MINIMUM: { 1352 + case MQTTASYNC_TRACE_MINIMUM:
  1353 + {
1293 // ("ClientPaho", "paho - %1", message) 1354 // ("ClientPaho", "paho - %1", message)
1294 break; 1355 break;
1295 } 1356 }
1296 - case MQTTASYNC_TRACE_PROTOCOL: { 1357 + case MQTTASYNC_TRACE_PROTOCOL:
  1358 + {
1297 // ("ClientPaho", "paho - %1", message) 1359 // ("ClientPaho", "paho - %1", message)
1298 break; 1360 break;
1299 } 1361 }
1300 case MQTTASYNC_TRACE_ERROR: 1362 case MQTTASYNC_TRACE_ERROR:
1301 case MQTTASYNC_TRACE_SEVERE: 1363 case MQTTASYNC_TRACE_SEVERE:
1302 - case MQTTASYNC_TRACE_FATAL: { 1364 + case MQTTASYNC_TRACE_FATAL:
  1365 + {
1303 // ("ClientPaho", "paho - %1", message) 1366 // ("ClientPaho", "paho - %1", message)
1304 break; 1367 break;
1305 } 1368 }
src/connectionstatus.cpp
@@ -38,5 +38,5 @@ std::ostream&amp; operator&lt;&lt;(std::ostream &amp;os, ConnectionStatus rhs) @@ -38,5 +38,5 @@ std::ostream&amp; operator&lt;&lt;(std::ostream &amp;os, ConnectionStatus rhs)
38 case ConnectionStatus::Connected: 38 case ConnectionStatus::Connected:
39 return os << "Connected"; 39 return os << "Connected";
40 } 40 }
41 - return os << "Unknown"; // Should never be reached. 41 + return os << "Unknown"; // "Should" never be reached. ( But guess what? )
42 } 42 }
src/log.cpp
@@ -53,7 +53,7 @@ int toInt( LogLevel level ) @@ -53,7 +53,7 @@ int toInt( LogLevel level )
53 53
54 std::string Log::s_context = std::string(); 54 std::string Log::s_context = std::string();
55 std::string Log::s_fileName = std::string(); 55 std::string Log::s_fileName = std::string();
56 -LogLevel Log::s_logLevel = LogLevel::Error; 56 +LogLevel Log::s_logLevel = LogLevel::Debug;
57 LogMask Log::s_logMask = LogMask::None; 57 LogMask Log::s_logMask = LogMask::None;
58 58
59 void Log::init( const std::string& context, const std::string& logFile, LogLevel logDepth ) 59 void Log::init( const std::string& context, const std::string& logFile, LogLevel logDepth )
src/mqttclient.cpp
@@ -57,6 +57,7 @@ std::string generateUniqueClientId(const std::string&amp; clientId, std::size_t clie @@ -57,6 +57,7 @@ std::string generateUniqueClientId(const std::string&amp; clientId, std::size_t clie
57 MqttClient::MqttClient(const std::string& _clientId, const std::function<void(const Token& token)>& deliveryCompleteCallback) 57 MqttClient::MqttClient(const std::string& _clientId, const std::function<void(const Token& token)>& deliveryCompleteCallback)
58 : m_interfaceMutex() 58 : m_interfaceMutex()
59 , m_internalMutex() 59 , m_internalMutex()
  60 + , m_subscriptionMutex()
60 , m_endpoint() 61 , m_endpoint()
61 , m_clientId(_clientId) 62 , m_clientId(_clientId)
62 , m_activeTokens() 63 , m_activeTokens()
@@ -67,6 +68,7 @@ MqttClient::MqttClient(const std::string&amp; _clientId, const std::function&lt;void(co @@ -67,6 +68,7 @@ MqttClient::MqttClient(const std::string&amp; _clientId, const std::function&lt;void(co
67 , m_additionalClients() 68 , m_additionalClients()
68 , m_eventQueue(_clientId) 69 , m_eventQueue(_clientId)
69 , m_workerThread( std::thread( &MqttClient::eventHandler, this ) ) 70 , m_workerThread( std::thread( &MqttClient::eventHandler, this ) )
  71 + , m_deferredSubscriptions()
70 { 72 {
71 Log::init( "mqtt-library" ); 73 Log::init( "mqtt-library" );
72 LogInfo( "MQTT Client started", "[MqttClient::MqttClient]"); 74 LogInfo( "MQTT Client started", "[MqttClient::MqttClient]");
@@ -125,7 +127,7 @@ StateEnum MqttClient::state() const @@ -125,7 +127,7 @@ StateEnum MqttClient::state() const
125 return m_serverState.state(); 127 return m_serverState.state();
126 } 128 }
127 129
128 -void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt ) 130 +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt, bool blocking )
129 { 131 {
130 osdev::components::mqtt::ParsedUri _endpoint = { 132 osdev::components::mqtt::ParsedUri _endpoint = {
131 { "scheme", "tcp" }, 133 { "scheme", "tcp" },
@@ -135,10 +137,10 @@ void MqttClient::connect(const std::string&amp; host, int port, const Credentials &amp;c @@ -135,10 +137,10 @@ void MqttClient::connect(const std::string&amp; host, int port, const Credentials &amp;c
135 { "port", std::to_string(port) } 137 { "port", std::to_string(port) }
136 }; 138 };
137 139
138 - this->connect( UriParser::toString( _endpoint ), lwt ); 140 + this->connect( UriParser::toString( _endpoint ), lwt, blocking );
139 } 141 }
140 142
141 -void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) 143 +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt, bool blocking )
142 { 144 {
143 LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); 145 LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) );
144 146
@@ -171,7 +173,7 @@ void MqttClient::connect( const std::string &amp;_endpoint, const mqtt_LWT &amp;lwt ) @@ -171,7 +173,7 @@ void MqttClient::connect( const std::string &amp;_endpoint, const mqtt_LWT &amp;lwt )
171 client = m_principalClient.get(); 173 client = m_principalClient.get();
172 } 174 }
173 175
174 - client->connect( true, lwt ); 176 + client->connect( blocking, lwt );
175 } 177 }
176 178
177 void MqttClient::disconnect() 179 void MqttClient::disconnect()
@@ -248,26 +250,38 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi @@ -248,26 +250,38 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
248 IMqttClientImpl* client(nullptr); 250 IMqttClientImpl* client(nullptr);
249 { 251 {
250 // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); 252 // OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
251 - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) 253 + if (!m_principalClient || m_principalClient->connectionStatus() != ConnectionStatus::Connected)
252 { 254 {
253 LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) ); 255 LogError("MqttClient", std::string( m_clientId + " - Unable to subscribe, not connected" ) );
254 - // throw (?)(MqttException, "Not connected"); 256 + // Store the subscription in the buffer for later processing.
  257 + {
  258 + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
  259 + m_deferredSubscriptions.emplace_back( topic, qos, cb );
  260 + }
  261 +
255 return Token(m_clientId, -1); 262 return Token(m_clientId, -1);
256 } 263 }
257 - if (!m_principalClient->isOverlapping(topic)) { 264 +
  265 + if (!m_principalClient->isOverlapping(topic))
  266 + {
258 client = m_principalClient.get(); 267 client = m_principalClient.get();
259 clientFound = true; 268 clientFound = true;
260 } 269 }
261 - else {  
262 - for (const auto& c : m_additionalClients) {  
263 - if (!c->isOverlapping(topic)) { 270 + else
  271 + {
  272 + for (const auto& c : m_additionalClients)
  273 + {
  274 + if (!c->isOverlapping(topic))
  275 + {
264 client = c.get(); 276 client = c.get();
265 clientFound = true; 277 clientFound = true;
266 break; 278 break;
267 } 279 }
268 } 280 }
269 } 281 }
270 - if (!clientFound) { 282 +
  283 + if (!clientFound)
  284 + {
271 LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); 285 LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) );
272 std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1. 286 std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1.
273 m_additionalClients.emplace_back(std::make_unique<ClientPaho>( 287 m_additionalClients.emplace_back(std::make_unique<ClientPaho>(
@@ -278,8 +292,10 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi @@ -278,8 +292,10 @@ Token MqttClient::subscribe(const std::string&amp; topic, int qos, const std::functi
278 client = m_additionalClients.back().get(); 292 client = m_additionalClients.back().get();
279 } 293 }
280 } 294 }
281 - if (!clientFound) {  
282 - client->connect(true); 295 +
  296 + if (!clientFound)
  297 + {
  298 + client->connect( true );
283 } 299 }
284 return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; 300 return Token{ client->clientId(), client->subscribe(topic, qos, cb) };
285 } 301 }
@@ -291,13 +307,15 @@ std::set&lt;Token&gt; MqttClient::unsubscribe(const std::string&amp; topic, int qos) @@ -291,13 +307,15 @@ std::set&lt;Token&gt; MqttClient::unsubscribe(const std::string&amp; topic, int qos)
291 std::vector<IMqttClientImpl*> clients{}; 307 std::vector<IMqttClientImpl*> clients{};
292 { 308 {
293 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); 309 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
294 - if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected) { 310 + if (!m_principalClient || m_principalClient->connectionStatus() == ConnectionStatus::Disconnected)
  311 + {
295 LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) ); 312 LogError("[MqttClient::unsubscribe]", std::string( m_clientId + " - Unable to unsubscribe, not connected" ) );
296 // Throw (MqttException, "Not connected"); 313 // Throw (MqttException, "Not connected");
297 return std::set<Token>(); 314 return std::set<Token>();
298 } 315 }
299 clients.push_back(m_principalClient.get()); 316 clients.push_back(m_principalClient.get());
300 - for (const auto& c : m_additionalClients) { 317 + for (const auto& c : m_additionalClients)
  318 + {
301 clients.push_back(c.get()); 319 clients.push_back(c.get());
302 } 320 }
303 } 321 }
@@ -381,31 +399,54 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -381,31 +399,54 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
381 std::vector<ConnectionStatus> connectionStates{}; 399 std::vector<ConnectionStatus> connectionStates{};
382 { 400 {
383 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); 401 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
384 - if (!m_principalClient) {  
385 - return;  
386 - }  
387 - if (m_principalClient) { 402 +
  403 + if (m_principalClient)
  404 + {
388 principalClient = m_principalClient.get(); 405 principalClient = m_principalClient.get();
389 clients.push_back(principalClient); 406 clients.push_back(principalClient);
390 connectionStates.push_back(m_principalClient->connectionStatus()); 407 connectionStates.push_back(m_principalClient->connectionStatus());
391 } 408 }
392 - for (const auto& c : m_additionalClients) { 409 +
  410 + for (const auto& c : m_additionalClients)
  411 + {
393 clients.push_back(c.get()); 412 clients.push_back(c.get());
394 connectionStates.push_back(c->connectionStatus()); 413 connectionStates.push_back(c->connectionStatus());
395 } 414 }
396 } 415 }
  416 +
397 auto newState = determineState(connectionStates); 417 auto newState = determineState(connectionStates);
398 - bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);  
399 - if (resubscribe) { 418 + // bool resubscribe = (StateEnum::ConnectionFailure == m_serverState.state() && StateEnum::Good == newState);
  419 + bool resubscribe = ( StateEnum::Good == newState );
  420 + if (resubscribe)
  421 + {
  422 + // First activate pending subscriptions
  423 + {
  424 + OSDEV_COMPONENTS_LOCKGUARD(m_subscriptionMutex);
  425 + LogDebug( "[MqttClient::connectionsStatusChanged]", std::string( m_clientId + " - Number of pending subscriptions : " + std::to_string(m_deferredSubscriptions.size() ) ) );
  426 + while( m_deferredSubscriptions.size() > 0 )
  427 + {
  428 + auto subscription = m_deferredSubscriptions.at( 0 );
  429 + this->subscribe( subscription.getTopic(), subscription.getQoS(), subscription.getCallBack() );
  430 + m_deferredSubscriptions.erase( m_deferredSubscriptions.begin() );
  431 + }
  432 + }
  433 +
  434 + LogDebug( "[MqttClient::connectionStatusChanged]",
  435 + std::string( m_clientId + " - Resubscribing..." ) );
400 { 436 {
401 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex); 437 OSDEV_COMPONENTS_LOCKGUARD(m_internalMutex);
402 m_activeTokens.clear(); 438 m_activeTokens.clear();
403 } 439 }
404 - for (auto* cl : clients) {  
405 - try { 440 +
  441 + for (auto* cl : clients)
  442 + {
  443 + try
  444 + {
  445 + LogDebug( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - Client " + cl->clientId() + " has " + std::string( cl->hasPendingSubscriptions() ? "" : "no" ) + " pending subscriptions" ) );
406 cl->resubscribe(); 446 cl->resubscribe();
407 } 447 }
408 - catch (const std::exception& e) { 448 + catch (const std::exception& e)
  449 + {
409 LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) ); 450 LogError("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - resubscribe on wrapped client " + cl->clientId() + " in context of connection status change in wrapped client : " + id + " => FAILED : " + e.what() ) );
410 } 451 }
411 } 452 }
@@ -415,20 +456,32 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus @@ -415,20 +456,32 @@ void MqttClient::connectionStatusChanged(const std::string&amp; id, ConnectionStatus
415 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread 456 // The server state change and a possible resubscription are done in the context of the MqttClient worker thread
416 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions. 457 // The wrapper is free to pick up new work such as the acknowledment of the just recreated subscriptions.
417 this->pushEvent([this, resubscribe, clients, principalClient, newState]() { 458 this->pushEvent([this, resubscribe, clients, principalClient, newState]() {
418 - if (resubscribe) { 459 + if (resubscribe)
  460 + {
419 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens. 461 // Just wait for the subscription commands to complete. We do not use waitForCompletionInternal because that call will always timeout when there are active tokens.
420 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread. 462 // Active tokens are removed typically by work done on the worker thread. The wait action is also performed on the worker thread.
421 auto waitFor = std::chrono::milliseconds(1000); 463 auto waitFor = std::chrono::milliseconds(1000);
422 - if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{})) {  
423 - if (std::accumulate(clients.begin(), clients.end(), false, [](bool hasPending, IMqttClientImpl* client) { return hasPending || client->hasPendingSubscriptions(); })) { 464 + if (!waitForCompletionInternalClients(clients, waitFor, std::set<Token>{}))
  465 + {
  466 + if (std::accumulate(clients.begin(),
  467 + clients.end(),
  468 + false,
  469 + [](bool hasPending, IMqttClientImpl* client)
  470 + {
  471 + return hasPending || client->hasPendingSubscriptions();
  472 + }))
  473 + {
424 LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) ); 474 LogWarning("[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - subscriptions are not recovered within timeout." ) );
425 } 475 }
426 } 476 }
427 - if (principalClient) {  
428 - try { 477 + if (principalClient)
  478 + {
  479 + try
  480 + {
429 principalClient->publishPending(); 481 principalClient->publishPending();
430 } 482 }
431 - catch (const std::exception& e) { 483 + catch (const std::exception& e)
  484 + {
432 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) ); 485 LogError( "[MqttClient::connectionStatusChanged]", std::string( m_clientId + " - publishPending on wrapped client " + principalClient->clientId() + " => FAILED " + e.what() ) );
433 } 486 }
434 } 487 }