Commit 91223dca2633532a3b478c0d3d98c83f38303110
Merge branch 'fix/pgroen/deferred_connection' into 'development'
Fix/pgroen/deferred connection converted all logs to log.h format, except a few commented lines containing std::set params. <br> also made a number of syntax fixes and ran through the connection scenarios again, which are all working correctly now. See merge request !11
Showing
13 changed files
with
519 additions
and
359 deletions
CMakeLists.txt
debug_log.txt
0 โ 100644
1 | +With broker present | |
2 | + | |
3 | +Jul 02 00:50:19 intelnuc64.osdev.nl test_connection[30797]: MQTT Client started[30797]: [MqttClient::MqttClient] | |
4 | +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::eventHandler][30797]: ConnectionTest - starting event handler. | |
5 | +Jul 02 00:50:19 intelnuc64.osdev.nl MqttClient[30797]: ConnectionTest - Request connect | |
6 | +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - Setting the extra onConnected callback. | |
7 | +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 2 | |
8 | +Jul 02 00:50:19 intelnuc64.osdev.nl ClientPaho[30797]: ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - starting callback event handler | |
9 | +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho::onConnectSuccess][30797]: onConnectSucces triggered.. | |
10 | +Jul 02 00:50:19 intelnuc64.osdev.nl [ClientPaho][30797]: onConnectSuccessOnInstance ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a - connected to endpoint localhost:1883 (mqtt version 4, session present FALSE ) | |
11 | +Jul 02 00:50:19 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][30797]: ConnectionTest - connection status of wrapped client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a changed to 4 | |
12 | +Jul 02 00:50:30 intelnuc64.osdev.nl mosquitto[29463]: 1656715830: Client ConnectionTest_1_83746b87-431f-4ce1-82e3-2543e9b0a37a closed its connection. | |
13 | + | |
14 | + | |
15 | +Without a broker present | |
16 | + | |
17 | +Jul 02 00:55:33 intelnuc64.osdev.nl test_connection[31574]: MQTT Client started[31574]: [MqttClient::MqttClient] | |
18 | +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::eventHandler][31574]: ConnectionTest - starting event handler. | |
19 | +Jul 02 00:55:33 intelnuc64.osdev.nl MqttClient[31574]: ConnectionTest - Request connect | |
20 | +Jul 02 00:55:33 intelnuc64.osdev.nl [ClientPaho][31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - Setting the extra onConnected callback. | |
21 | +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 2 | |
22 | +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - starting callback event handler | |
23 | +Jul 02 00:55:33 intelnuc64.osdev.nl ClientPaho[31574]: onConnectFailureOnInstanceConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 - connection failed with code MQTTASYNC_FAILURE (TCP/TLS connect failure) | |
24 | +Jul 02 00:55:33 intelnuc64.osdev.nl [MqttClient::connectionStatusChanged][31574]: ConnectionTest - connection status of wrapped client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 changed to 0 | |
25 | + | |
26 | +Jul 02 00:55:42 intelnuc64.osdev.nl systemd[1]: Starting Mosquitto MQTT Broker... | |
27 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 starting | |
28 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Config loaded from /etc/mosquitto/mosquitto.conf. | |
29 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Starting in local only mode. Connections will only be possible from clients running on this machine. | |
30 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Create a configuration file which defines a listener to allow remote access. | |
31 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: For more details see https://mosquitto.org/documentation/authentication-methods/ | |
32 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv4 listen socket on port 1883. | |
33 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: Opening ipv6 listen socket on port 1883. | |
34 | +Jul 02 00:55:42 intelnuc64.osdev.nl mosquitto[31610]: 1656716142: mosquitto version 2.0.14 running | |
35 | +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New connection from 127.0.0.1:42196 on port 1883. | |
36 | +Jul 02 00:55:45 intelnuc64.osdev.nl mosquitto[31610]: 1656716145: New client connected from 127.0.0.1:42196 as ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 (p2, c1, k5). | |
37 | +Jul 02 00:55:53 intelnuc64.osdev.nl mosquitto[31610]: 1656716153: Client ConnectionTest_1_9eaee6bb-325b-406a-ae70-b4f3e7a41268 closed its connection. | |
38 | + | |
39 | + | ... | ... |
examples/connect/CMakeLists.txt
0 โ 100644
1 | +cmake_minimum_required(VERSION 3.0) | |
2 | +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/submodules/cmake) | |
3 | + | |
4 | +include(projectheader) | |
5 | +project_header(test_connections) | |
6 | + | |
7 | +include_directories( SYSTEM | |
8 | + ${CMAKE_CURRENT_SOURCE_DIR}/../../include | |
9 | +) | |
10 | + | |
11 | +include(compiler) | |
12 | +set(SRC_LIST | |
13 | + ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp | |
14 | +) | |
15 | + | |
16 | +add_executable( ${PROJECT_NAME} | |
17 | + ${SRC_LIST} | |
18 | +) | |
19 | + | |
20 | +target_link_libraries( | |
21 | + ${PROJECT_NAME} | |
22 | + mqtt-cpp | |
23 | +) | |
24 | + | |
25 | +set_target_properties( ${PROJECT_NAME} PROPERTIES | |
26 | + RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin | |
27 | + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib | |
28 | + ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib | |
29 | +) | |
30 | + | |
31 | +include(installation) | |
32 | +install_application() | ... | ... |
examples/connect/main.cpp
0 โ 100644
1 | + | |
2 | +#include <iostream> | |
3 | +#include <unistd.h> | |
4 | + | |
5 | +#include "mqttclient.h" | |
6 | + | |
7 | +using namespace osdev::components::mqtt; | |
8 | + | |
9 | +int main( int argc, char* argv[] ) | |
10 | +{ | |
11 | + (void)argc; | |
12 | + (void)argv; | |
13 | + | |
14 | + MqttClient oClient( "ConnectionTest" ); | |
15 | + oClient.connect( "localhost", 1883, Credentials() ); | |
16 | + | |
17 | + unsigned int nCount = 0; | |
18 | + | |
19 | + while(1) | |
20 | + { | |
21 | + std::cout << "[" << std::to_string(nCount++) << "] MQTT Client status : " << oClient.state() << std::endl; | |
22 | + sleep( 1 ); | |
23 | + } | |
24 | + | |
25 | + return 0; | |
26 | +} | ... | ... |
examples/pub/main.cpp
examples/sub/main.cpp
... | ... | @@ -83,7 +83,7 @@ int main( int argc, char* argv[] ) |
83 | 83 | // Start a loop to give the subscriber the possibility to do its work. |
84 | 84 | while( 1 ) |
85 | 85 | { |
86 | - sleepcp( 1, T_MICRO ); // Sleep 1 Sec to give the scheduler the change to interfene. | |
86 | + sleepcp( 1, T_SECONDS ); // Sleep 1 Sec to give the scheduler the change to interfene. | |
87 | 87 | } |
88 | 88 | } |
89 | 89 | else | ... | ... |
include/clientpaho.h
... | ... | @@ -215,7 +215,7 @@ private: |
215 | 215 | * The connection status is set to Connected. |
216 | 216 | * @param response A success response with connection data. |
217 | 217 | */ |
218 | - void onConnectSuccessOnInstance(const MqttSuccess& response); | |
218 | + void onConnectSuccessOnInstance(); | |
219 | 219 | |
220 | 220 | /** |
221 | 221 | * @brief Callback that is called when a connect call fails after being sent to the endpoint. |
... | ... | @@ -298,6 +298,7 @@ private: |
298 | 298 | void onConnectionLostOnInstance(const std::string& cause); |
299 | 299 | |
300 | 300 | // Static callback functions that are registered on the paho library. Functions call their *OnInstance() counterparts. |
301 | + static void onFirstConnect(void* context, char* cause); | |
301 | 302 | static void onConnect(void* context, char* cause); |
302 | 303 | static void onConnectSuccess(void* context, MQTTAsync_successData* response); |
303 | 304 | static void onConnectFailure(void* context, MQTTAsync_failureData* response); | ... | ... |
include/imqttclient.h
... | ... | @@ -60,13 +60,13 @@ public: |
60 | 60 | * @param port The port to use. |
61 | 61 | * @param credentials The credentials to use. |
62 | 62 | */ |
63 | - virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
63 | + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0; | |
64 | 64 | |
65 | 65 | /** |
66 | 66 | * @brief Connect to the endpoint |
67 | 67 | * @param endpoint an uri endpoint description. |
68 | 68 | */ |
69 | - virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
69 | + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) = 0; | |
70 | 70 | |
71 | 71 | /** |
72 | 72 | * @brief Disconnect the client from the broker | ... | ... |
include/mqttclient.h
... | ... | @@ -92,12 +92,12 @@ public: |
92 | 92 | /** |
93 | 93 | * @see IMqttClient |
94 | 94 | */ |
95 | - virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override; | |
95 | + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; | |
96 | 96 | |
97 | 97 | /** |
98 | 98 | * @see IMqttClient |
99 | 99 | */ |
100 | - virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override; | |
100 | + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT(), bool blocking = false ) override; | |
101 | 101 | |
102 | 102 | /** |
103 | 103 | * @see IMqttClient | ... | ... |
src/clientpaho.cpp
... | ... | @@ -24,6 +24,7 @@ |
24 | 24 | #include "errorcode.h" |
25 | 25 | #include "mqttutil.h" |
26 | 26 | #include "lockguard.h" |
27 | +#include "log.h" | |
27 | 28 | #include "metaprogrammingdefs.h" |
28 | 29 | #include "mqttstream.h" |
29 | 30 | #include "scopeguard.h" |
... | ... | @@ -93,10 +94,10 @@ struct Init |
93 | 94 | |
94 | 95 | std::atomic_int ClientPaho::s_numberOfInstances(0); |
95 | 96 | |
96 | -ClientPaho::ClientPaho(const std::string& _endpoint, | |
97 | +ClientPaho::ClientPaho( const std::string& _endpoint, | |
97 | 98 | const std::string& _id, |
98 | - const std::function<void(const std::string&, ConnectionStatus)>& connectionStatusCallback, | |
99 | - const std::function<void(const std::string& clientId, std::int32_t pubMsgToken)>& deliveryCompleteCallback) | |
99 | + const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback, | |
100 | + const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback ) | |
100 | 101 | : m_mutex() |
101 | 102 | , m_endpoint() |
102 | 103 | , m_username() |
... | ... | @@ -122,51 +123,55 @@ ClientPaho::ClientPaho(const std::string& _endpoint, |
122 | 123 | , m_callbackEventQueue(m_clientId) |
123 | 124 | , m_workerThread() |
124 | 125 | { |
125 | - if (0 == s_numberOfInstances++) { | |
126 | - MQTTAsync_setTraceCallback(&ClientPaho::onLogPaho); | |
126 | + if( 0 == s_numberOfInstances++ ) | |
127 | + { | |
128 | + MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho ); | |
127 | 129 | } |
128 | - // MLOGIC_COMMON_DEBUG("ClientPaho", "%1 - ctor ClientPaho %2", m_clientId, this); | |
130 | + | |
131 | + LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) ); | |
129 | 132 | parseEndpoint(_endpoint); |
130 | - auto rc = MQTTAsync_create(&m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); | |
131 | - if (MQTTASYNC_SUCCESS == rc) | |
133 | + | |
134 | + auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr ); | |
135 | + if( MQTTASYNC_SUCCESS == rc ) | |
132 | 136 | { |
133 | - MQTTAsync_setCallbacks(m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete); | |
134 | - m_workerThread = std::thread(&ClientPaho::callbackEventHandler, this); | |
137 | + MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete ); | |
138 | + m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this ); | |
135 | 139 | } |
136 | 140 | else |
137 | 141 | { |
138 | - // Do something sensible here. | |
142 | + LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) ); | |
139 | 143 | } |
140 | 144 | } |
141 | 145 | |
142 | 146 | ClientPaho::~ClientPaho() |
143 | 147 | { |
148 | + LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) ); | |
144 | 149 | if( MQTTAsync_isConnected( m_client ) ) |
145 | 150 | { |
146 | 151 | this->unsubscribeAll(); |
147 | 152 | |
148 | - this->waitForCompletion(std::chrono::milliseconds(2000), std::set<int32_t>{}); | |
149 | - this->disconnect(true, 5000); | |
153 | + this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} ); | |
154 | + this->disconnect( true, 5000 ); | |
150 | 155 | } |
151 | 156 | else |
152 | 157 | { |
153 | 158 | // If the status was already disconnected this call does nothing |
154 | - setConnectionStatus(ConnectionStatus::Disconnected); | |
159 | + setConnectionStatus( ConnectionStatus::Disconnected ); | |
155 | 160 | } |
156 | 161 | |
157 | - if (0 == --s_numberOfInstances) | |
162 | + if( 0 == --s_numberOfInstances ) | |
158 | 163 | { |
159 | 164 | // encountered a case where termination of the logging system within paho led to a segfault. |
160 | 165 | // This was a paho thread that was cleaned while at the same time the logging system was terminated. |
161 | 166 | // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less |
162 | 167 | // frequently. |
163 | - MQTTAsync_setTraceCallback(nullptr); | |
168 | + MQTTAsync_setTraceCallback( nullptr ); | |
164 | 169 | } |
165 | 170 | |
166 | - MQTTAsync_destroy(&m_client); | |
171 | + MQTTAsync_destroy( &m_client ); | |
167 | 172 | |
168 | 173 | m_callbackEventQueue.stop(); |
169 | - if (m_workerThread.joinable()) | |
174 | + if( m_workerThread.joinable() ) | |
170 | 175 | { |
171 | 176 | m_workerThread.join(); |
172 | 177 | } |
... | ... | @@ -193,14 +198,28 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) |
193 | 198 | setConnectionStatus( ConnectionStatus::ConnectInProgress ); |
194 | 199 | } |
195 | 200 | |
201 | + LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) ); | |
202 | + | |
196 | 203 | MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; |
197 | - conn_opts.keepAliveInterval = 20; | |
204 | + conn_opts.keepAliveInterval = 5; | |
198 | 205 | conn_opts.cleansession = 1; |
199 | - conn_opts.onSuccess = &ClientPaho::onConnectSuccess; | |
206 | + conn_opts.onSuccess = nullptr; | |
200 | 207 | conn_opts.onFailure = &ClientPaho::onConnectFailure; |
201 | 208 | conn_opts.context = this; |
202 | 209 | conn_opts.automaticReconnect = 1; |
203 | 210 | |
211 | + // Make sure we get a signal if the promise is fulfilled | |
212 | + auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect ); | |
213 | + if( MQTTASYNC_SUCCESS == ccb ) | |
214 | + { | |
215 | + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") ); | |
216 | + } | |
217 | + else | |
218 | + { | |
219 | + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") ); | |
220 | + } | |
221 | + | |
222 | + // Setup the last will and testament, if so desired. | |
204 | 223 | if( !lwt.topic().empty() ) |
205 | 224 | { |
206 | 225 | MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; |
... | ... | @@ -208,13 +227,14 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) |
208 | 227 | will_opts.topicName = lwt.topic().c_str(); |
209 | 228 | |
210 | 229 | conn_opts.will = &will_opts; |
230 | + | |
231 | + LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) ); | |
211 | 232 | } |
212 | 233 | else |
213 | 234 | { |
214 | 235 | conn_opts.will = nullptr; |
215 | 236 | } |
216 | 237 | |
217 | - | |
218 | 238 | if( !m_username.empty() ) |
219 | 239 | { |
220 | 240 | conn_opts.username = m_username.c_str(); |
... | ... | @@ -259,18 +279,19 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) |
259 | 279 | return -100; |
260 | 280 | } |
261 | 281 | |
262 | -std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) | |
282 | +std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs ) | |
263 | 283 | { |
264 | 284 | ConnectionStatus currentStatus = m_connectionStatus; |
265 | 285 | |
266 | 286 | { |
267 | 287 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
268 | - if (ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus) { | |
288 | + if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus ) | |
289 | + { | |
269 | 290 | return -1; |
270 | 291 | } |
271 | 292 | |
272 | 293 | currentStatus = m_connectionStatus; |
273 | - setConnectionStatus(ConnectionStatus::DisconnectInProgress); | |
294 | + setConnectionStatus( ConnectionStatus::DisconnectInProgress ); | |
274 | 295 | } |
275 | 296 | |
276 | 297 | MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize(); |
... | ... | @@ -282,43 +303,45 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) |
282 | 303 | std::promise<void> waitForDisconnectPromise{}; |
283 | 304 | auto waitForDisconnect = waitForDisconnectPromise.get_future(); |
284 | 305 | m_disconnectPromise.reset(); |
285 | - if (wait) { | |
306 | + if( wait ) | |
307 | + { | |
286 | 308 | m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise)); |
287 | 309 | } |
288 | 310 | |
289 | 311 | { |
290 | 312 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
291 | - if (!m_pendingOperations.insert(-200).second) | |
313 | + if( !m_pendingOperations.insert( -200 ).second ) | |
292 | 314 | { |
293 | - // "ClientPaho", "%1 disconnect - token %2 already in use", m_clientId, -200) | |
315 | + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) ); | |
294 | 316 | } |
295 | - m_operationResult.erase(-200); | |
317 | + m_operationResult.erase( -200 ); | |
296 | 318 | } |
297 | 319 | |
298 | - int rc = MQTTAsync_disconnect(m_client, &disconn_opts); | |
299 | - if (MQTTASYNC_SUCCESS != rc) { | |
300 | - if (MQTTASYNC_DISCONNECTED == rc) { | |
320 | + int rc = MQTTAsync_disconnect( m_client, &disconn_opts ); | |
321 | + if( MQTTASYNC_SUCCESS != rc ) | |
322 | + { | |
323 | + if( MQTTASYNC_DISCONNECTED == rc ) | |
324 | + { | |
301 | 325 | currentStatus = ConnectionStatus::Disconnected; |
302 | 326 | } |
303 | 327 | |
304 | - setConnectionStatus(currentStatus); | |
305 | - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | |
328 | + setConnectionStatus( currentStatus ); | |
329 | + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); | |
306 | 330 | m_operationResult[-200] = false; |
307 | - m_pendingOperations.erase(-200); | |
331 | + m_pendingOperations.erase( -200 ); | |
308 | 332 | |
309 | - if (MQTTASYNC_DISCONNECTED == rc) | |
333 | + if( MQTTASYNC_DISCONNECTED == rc ) | |
310 | 334 | { |
311 | 335 | return -1; |
312 | 336 | } |
313 | - // ("ClientPaho", "%1 - failed to disconnect, return code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); | |
337 | + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) ); | |
314 | 338 | } |
315 | 339 | |
316 | 340 | if( wait ) |
317 | 341 | { |
318 | 342 | if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100))) |
319 | 343 | { |
320 | - // ("ClientPaho", "%1 - timeout occurred on disconnect", m_clientId); | |
321 | - | |
344 | + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) ); | |
322 | 345 | } |
323 | 346 | waitForDisconnect.get(); |
324 | 347 | m_disconnectPromise.reset(); |
... | ... | @@ -326,67 +349,67 @@ std::int32_t ClientPaho::disconnect(bool wait, int timeoutMs) |
326 | 349 | return -200; |
327 | 350 | } |
328 | 351 | |
329 | -std::int32_t ClientPaho::publish(const MqttMessage& message, int qos) | |
352 | +std::int32_t ClientPaho::publish( const MqttMessage& message, int qos ) | |
330 | 353 | { |
331 | - if (ConnectionStatus::DisconnectInProgress == m_connectionStatus) | |
354 | + if( ConnectionStatus::DisconnectInProgress == m_connectionStatus ) | |
332 | 355 | { |
333 | - // ("ClientPaho", "%1 - disconnect in progress, ignoring publish with qos %2 on topic %3", m_clientId, qos, message.topic()); | |
356 | + LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) ); | |
334 | 357 | return -1; |
335 | 358 | } |
336 | - else if (ConnectionStatus::Disconnected == m_connectionStatus) | |
359 | + else if( ConnectionStatus::Disconnected == m_connectionStatus ) | |
337 | 360 | { |
338 | - // ("ClientPaho", "%1 - unable to publish, not connected", m_clientId); | |
361 | + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) ); | |
339 | 362 | connect( true ); |
340 | 363 | } |
341 | 364 | |
342 | - if (!isValidTopic(message.topic())) | |
365 | + if( !isValidTopic(message.topic() ) ) | |
343 | 366 | { |
344 | - // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, message.topic()); | |
367 | + LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) ); | |
345 | 368 | } |
346 | 369 | |
347 | - if (qos > 2) | |
370 | + if( qos > 2 ) | |
348 | 371 | { |
349 | 372 | qos = 2; |
350 | 373 | } |
351 | - else if (qos < 0) | |
374 | + else if( qos < 0 ) | |
352 | 375 | { |
353 | 376 | qos = 0; |
354 | 377 | } |
355 | 378 | |
356 | - | |
357 | 379 | std::unique_lock<std::mutex> lck(m_mutex); |
358 | - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes) { | |
380 | + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes ) | |
381 | + { | |
359 | 382 | m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; }); |
360 | - if (ConnectionStatus::ReconnectInProgress == m_connectionStatus) { | |
361 | - // ("ClientPaho", "Adding publish to pending queue."); | |
362 | - m_pendingPublishes.push_front(Publish{ qos, message }); | |
383 | + if( ConnectionStatus::ReconnectInProgress == m_connectionStatus ) | |
384 | + { | |
385 | + LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." ); | |
386 | + m_pendingPublishes.push_front( Publish{ qos, message } ); | |
363 | 387 | return -1; |
364 | 388 | } |
365 | 389 | } |
366 | 390 | |
367 | - return publishInternal(message, qos); | |
391 | + return publishInternal( message, qos ); | |
368 | 392 | } |
369 | 393 | |
370 | 394 | void ClientPaho::publishPending() |
371 | 395 | { |
372 | 396 | { |
373 | 397 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
374 | - if (!m_processPendingPublishes) | |
398 | + if( !m_processPendingPublishes ) | |
375 | 399 | { |
376 | 400 | return; |
377 | 401 | } |
378 | 402 | } |
379 | 403 | |
380 | - if (ConnectionStatus::Connected != m_connectionStatus) | |
404 | + if( ConnectionStatus::Connected != m_connectionStatus ) | |
381 | 405 | { |
382 | - // MqttException, "Not connected"); | |
406 | + LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) ) | |
383 | 407 | } |
384 | 408 | |
385 | - while (!m_pendingPublishes.empty()) | |
409 | + while( !m_pendingPublishes.empty() ) | |
386 | 410 | { |
387 | 411 | const auto& pub = m_pendingPublishes.back(); |
388 | - publishInternal(pub.data, pub.qos); | |
389 | - // else ("ClientPaho", "%1 - pending publish on topic %2 failed : %3", m_clientId, pub.data.topic(), e.what()); | |
412 | + publishInternal( pub.data, pub.qos ); | |
390 | 413 | |
391 | 414 | m_pendingPublishes.pop_back(); |
392 | 415 | } |
... | ... | @@ -398,23 +421,23 @@ void ClientPaho::publishPending() |
398 | 421 | m_pendingPublishesReadyCV.notify_all(); |
399 | 422 | } |
400 | 423 | |
401 | -std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb) | |
424 | +std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb ) | |
402 | 425 | { |
403 | - if (ConnectionStatus::Connected != m_connectionStatus) | |
426 | + if( ConnectionStatus::Connected != m_connectionStatus ) | |
404 | 427 | { |
405 | 428 | // MqttException, "Not connected" |
406 | 429 | } |
407 | 430 | |
408 | - if (!isValidTopic(topic)) | |
431 | + if( !isValidTopic( topic ) ) | |
409 | 432 | { |
410 | 433 | // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic); |
411 | 434 | } |
412 | 435 | |
413 | - if (qos > 2) | |
436 | + if( qos > 2 ) | |
414 | 437 | { |
415 | 438 | qos = 2; |
416 | 439 | } |
417 | - else if (qos < 0) | |
440 | + else if( qos < 0 ) | |
418 | 441 | { |
419 | 442 | qos = 0; |
420 | 443 | } |
... | ... | @@ -423,21 +446,27 @@ std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std: |
423 | 446 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
424 | 447 | |
425 | 448 | auto itExisting = m_subscriptions.find(topic); |
426 | - if (m_subscriptions.end() != itExisting) { | |
427 | - if (itExisting->second.qos == qos) { | |
449 | + if( m_subscriptions.end() != itExisting ) | |
450 | + { | |
451 | + if( itExisting->second.qos == qos ) | |
452 | + { | |
428 | 453 | return -1; |
429 | 454 | } |
430 | 455 | // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic); |
431 | 456 | } |
432 | 457 | |
433 | 458 | auto itPending = m_pendingSubscriptions.find(topic); |
434 | - if (m_pendingSubscriptions.end() != itPending) { | |
435 | - if (itPending->second.qos == qos) { | |
436 | - auto itToken = std::find_if(m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; }); | |
437 | - if (m_subscribeTokenToTopic.end() != itToken) { | |
459 | + if( m_pendingSubscriptions.end() != itPending ) | |
460 | + { | |
461 | + if( itPending->second.qos == qos ) | |
462 | + { | |
463 | + auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } ); | |
464 | + if( m_subscribeTokenToTopic.end() != itToken ) | |
465 | + { | |
438 | 466 | return itToken->first; |
439 | 467 | } |
440 | - else { | |
468 | + else | |
469 | + { | |
441 | 470 | return -1; |
442 | 471 | } |
443 | 472 | } |
... | ... | @@ -445,45 +474,45 @@ std::int32_t ClientPaho::subscribe(const std::string& topic, int qos, const std: |
445 | 474 | } |
446 | 475 | |
447 | 476 | std::string existingTopic{}; |
448 | - if (isOverlappingInternal(topic, existingTopic)) | |
477 | + if( isOverlappingInternal( topic, existingTopic ) ) | |
449 | 478 | { |
450 | 479 | // (OverlappingTopicException, "overlapping topic", existingTopic, topic); |
451 | 480 | } |
452 | 481 | |
453 | - // ("ClientPaho", "%1 - adding subscription on topic %2 to the pending subscriptions", m_clientId, topic); | |
454 | - m_pendingSubscriptions.emplace(std::make_pair(topic, Subscription{ qos, boost::regex(convertTopicToRegex(topic)), cb })); | |
482 | + LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) ); | |
483 | + m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) ); | |
455 | 484 | } |
456 | - return subscribeInternal(topic, qos); | |
485 | + return subscribeInternal( topic, qos ); | |
457 | 486 | } |
458 | 487 | |
459 | 488 | void ClientPaho::resubscribe() |
460 | 489 | { |
461 | - decltype(m_pendingSubscriptions) pendingSubscriptions{}; | |
490 | + decltype( m_pendingSubscriptions ) pendingSubscriptions{}; | |
462 | 491 | { |
463 | 492 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
464 | - std::copy(m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end())); | |
493 | + std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) ); | |
465 | 494 | } |
466 | 495 | |
467 | - for (const auto& s : pendingSubscriptions) | |
496 | + for( const auto& s : pendingSubscriptions ) | |
468 | 497 | { |
469 | - subscribeInternal(s.first, s.second.qos); | |
498 | + subscribeInternal( s.first, s.second.qos ); | |
470 | 499 | } |
471 | 500 | } |
472 | 501 | |
473 | 502 | std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) |
474 | 503 | { |
475 | 504 | { |
476 | - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | |
505 | + OSDEV_COMPONENTS_LOCKGUARD( m_mutex ); | |
477 | 506 | bool found = false; |
478 | - for (const auto& s : m_subscriptions) | |
507 | + for( const auto& s : m_subscriptions ) | |
479 | 508 | { |
480 | - if (topic == s.first && qos == s.second.qos) | |
509 | + if( topic == s.first && qos == s.second.qos ) | |
481 | 510 | { |
482 | 511 | found = true; |
483 | 512 | break; |
484 | 513 | } |
485 | 514 | } |
486 | - if (!found) | |
515 | + if( !found ) | |
487 | 516 | { |
488 | 517 | return -1; |
489 | 518 | } |
... | ... | @@ -498,21 +527,21 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) |
498 | 527 | // Need to lock the mutex because it is possible that the callback is faster than |
499 | 528 | // the insertion of the token into the pending operations. |
500 | 529 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
501 | - auto rc = MQTTAsync_unsubscribe(m_client, topic.c_str(), &opts); | |
502 | - if (MQTTASYNC_SUCCESS != rc) | |
530 | + auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts ); | |
531 | + if( MQTTASYNC_SUCCESS != rc ) | |
503 | 532 | { |
504 | - // ("ClientPaho", "%1 - unsubscribe on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); | |
533 | + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); | |
505 | 534 | } |
506 | 535 | |
507 | - if (!m_pendingOperations.insert(opts.token).second) | |
536 | + if( !m_pendingOperations.insert( opts.token ).second ) | |
508 | 537 | { |
509 | - // ("ClientPaho", "%1 unsubscribe - token %2 already in use", m_clientId, opts.token); | |
538 | + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) ); | |
510 | 539 | } |
511 | 540 | |
512 | - m_operationResult.erase(opts.token); | |
513 | - if (m_unsubscribeTokenToTopic.count(opts.token) > 0) | |
541 | + m_operationResult.erase( opts.token ); | |
542 | + if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 ) | |
514 | 543 | { |
515 | - // ("ClientPaho", "%1 - token already in use, replacing unsubscribe from topic %2 with topic %3", m_clientId, m_unsubscribeTokenToTopic[opts.token], topic); | |
544 | + LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) ); | |
516 | 545 | } |
517 | 546 | m_lastUnsubscribe = opts.token; // centos7 workaround |
518 | 547 | m_unsubscribeTokenToTopic[opts.token] = topic; |
... | ... | @@ -526,39 +555,42 @@ std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) |
526 | 555 | |
527 | 556 | void ClientPaho::unsubscribeAll() |
528 | 557 | { |
529 | - decltype(m_subscriptions) subscriptions{}; | |
558 | + decltype( m_subscriptions ) subscriptions{}; | |
530 | 559 | { |
531 | 560 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
532 | 561 | subscriptions = m_subscriptions; |
533 | 562 | } |
534 | 563 | |
535 | - for (const auto& s : subscriptions) { | |
536 | - this->unsubscribe(s.first, s.second.qos); | |
564 | + for( const auto& s : subscriptions ) | |
565 | + { | |
566 | + this->unsubscribe( s.first, s.second.qos ); | |
537 | 567 | } |
538 | 568 | } |
539 | 569 | |
540 | 570 | std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const |
541 | 571 | { |
542 | - if (waitFor <= std::chrono::milliseconds(0)) { | |
543 | - return std::chrono::milliseconds(0); | |
572 | + if( waitFor <= std::chrono::milliseconds( 0 ) ) | |
573 | + { | |
574 | + return std::chrono::milliseconds( 0 ); | |
544 | 575 | } |
545 | 576 | std::chrono::milliseconds timeElapsed{}; |
546 | 577 | { |
547 | - osdev::components::mqtt::measurement::TimeMeasurement msr("waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds) | |
578 | + osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds ) | |
548 | 579 | { |
549 | - timeElapsed = std::chrono::ceil<std::chrono::milliseconds>(sinceStart); | |
580 | + timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart ); | |
550 | 581 | }); |
551 | 582 | std::unique_lock<std::mutex> lck(m_mutex); |
583 | + | |
552 | 584 | // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations); |
553 | 585 | m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]() |
554 | 586 | { |
555 | - if (tokens.empty()) | |
587 | + if( tokens.empty() ) | |
556 | 588 | { // wait for all operations to end |
557 | 589 | return m_pendingOperations.empty(); |
558 | 590 | } |
559 | - else if (tokens.size() == 1) | |
591 | + else if( tokens.size() == 1 ) | |
560 | 592 | { |
561 | - return m_pendingOperations.find(*tokens.cbegin()) == m_pendingOperations.end(); | |
593 | + return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end(); | |
562 | 594 | } |
563 | 595 | std::vector<std::int32_t> intersect{}; |
564 | 596 | std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect)); |
... | ... | @@ -568,16 +600,16 @@ std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::millisecond |
568 | 600 | return timeElapsed; |
569 | 601 | } |
570 | 602 | |
571 | -bool ClientPaho::isOverlapping(const std::string& topic) const | |
603 | +bool ClientPaho::isOverlapping( const std::string& topic ) const | |
572 | 604 | { |
573 | 605 | std::string existingTopic{}; |
574 | - return isOverlapping(topic, existingTopic); | |
606 | + return isOverlapping( topic, existingTopic ); | |
575 | 607 | } |
576 | 608 | |
577 | -bool ClientPaho::isOverlapping(const std::string& topic, std::string& existingTopic) const | |
609 | +bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const | |
578 | 610 | { |
579 | 611 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
580 | - return isOverlappingInternal(topic, existingTopic); | |
612 | + return isOverlappingInternal( topic, existingTopic ); | |
581 | 613 | } |
582 | 614 | |
583 | 615 | std::vector<std::int32_t> ClientPaho::pendingOperations() const |
... | ... | @@ -585,7 +617,7 @@ std::vector<std::int32_t> ClientPaho::pendingOperations() const |
585 | 617 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
586 | 618 | std::vector<std::int32_t> retval{}; |
587 | 619 | retval.resize(m_pendingOperations.size()); |
588 | - std::copy(m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin()); | |
620 | + std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() ); | |
589 | 621 | return retval; |
590 | 622 | } |
591 | 623 | |
... | ... | @@ -595,36 +627,36 @@ bool ClientPaho::hasPendingSubscriptions() const |
595 | 627 | return !m_pendingSubscriptions.empty(); |
596 | 628 | } |
597 | 629 | |
598 | -boost::optional<bool> ClientPaho::operationResult(std::int32_t token) const | |
630 | +boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const | |
599 | 631 | { |
600 | 632 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
601 | 633 | boost::optional<bool> ret{}; |
602 | - auto cit = m_operationResult.find(token); | |
603 | - if (m_operationResult.end() != cit) | |
634 | + auto cit = m_operationResult.find( token ); | |
635 | + if( m_operationResult.end() != cit ) | |
604 | 636 | { |
605 | 637 | ret = cit->second; |
606 | 638 | } |
607 | 639 | return ret; |
608 | 640 | } |
609 | 641 | |
610 | -void ClientPaho::parseEndpoint(const std::string& _endpoint) | |
642 | +void ClientPaho::parseEndpoint( const std::string& _endpoint ) | |
611 | 643 | { |
612 | - auto ep = UriParser::parse(_endpoint); | |
613 | - if (ep.find("user") != ep.end()) | |
644 | + auto ep = UriParser::parse( _endpoint ); | |
645 | + if( ep.find( "user" ) != ep.end() ) | |
614 | 646 | { |
615 | 647 | m_username = ep["user"]; |
616 | 648 | ep["user"].clear(); |
617 | 649 | } |
618 | 650 | |
619 | - if (ep.find("password") != ep.end()) | |
651 | + if( ep.find( "password" ) != ep.end() ) | |
620 | 652 | { |
621 | 653 | m_password = ep["password"]; |
622 | 654 | ep["password"].clear(); |
623 | 655 | } |
624 | - m_endpoint = UriParser::toString(ep); | |
656 | + m_endpoint = UriParser::toString( ep ); | |
625 | 657 | } |
626 | 658 | |
627 | -std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) | |
659 | +std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos ) | |
628 | 660 | { |
629 | 661 | MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; |
630 | 662 | opts.onSuccess = &ClientPaho::onPublishSuccess; |
... | ... | @@ -637,21 +669,21 @@ std::int32_t ClientPaho::publishInternal(const MqttMessage& message, int qos) |
637 | 669 | // the insertion of the token into the pending operations. |
638 | 670 | |
639 | 671 | // OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
640 | - auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts); | |
641 | - if (MQTTASYNC_SUCCESS != rc) | |
672 | + auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts ); | |
673 | + if( MQTTASYNC_SUCCESS != rc ) | |
642 | 674 | { |
643 | - // ("ClientPaho", "%1 - publish on topic %2 failed with code %3", m_clientId, message.topic(), pahoAsyncErrorCodeToString(rc)); | |
675 | + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); | |
644 | 676 | } |
645 | 677 | |
646 | - if (!m_pendingOperations.insert(opts.token).second) | |
678 | + if( !m_pendingOperations.insert( opts.token ).second ) | |
647 | 679 | { |
648 | - // ("ClientPaho", "%1 publishInternal - token %2 already in use", m_clientId, opts.token); | |
680 | + LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) ); | |
649 | 681 | } |
650 | - m_operationResult.erase(opts.token); | |
682 | + m_operationResult.erase( opts.token ); | |
651 | 683 | return opts.token; |
652 | 684 | } |
653 | 685 | |
654 | -std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) | |
686 | +std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos ) | |
655 | 687 | { |
656 | 688 | MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; |
657 | 689 | opts.onSuccess = &ClientPaho::onSubscribeSuccess; |
... | ... | @@ -661,52 +693,51 @@ std::int32_t ClientPaho::subscribeInternal(const std::string& topic, int qos) |
661 | 693 | // Need to lock the mutex because it is possible that the callback is faster than |
662 | 694 | // the insertion of the token into the pending operations. |
663 | 695 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
664 | - auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts); | |
696 | + auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts ); | |
665 | 697 | if (MQTTASYNC_SUCCESS != rc) |
666 | 698 | { |
667 | - m_pendingSubscriptions.erase(topic); | |
668 | - // ("ClientPaho", "%1 - subscription on topic %2 failed with code %3", m_clientId, topic, pahoAsyncErrorCodeToString(rc)); | |
669 | - // (MqttException, "Subscription failed"); | |
699 | + m_pendingSubscriptions.erase( topic ); | |
700 | + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) ); | |
670 | 701 | } |
671 | 702 | |
672 | - if (!m_pendingOperations.insert(opts.token).second) | |
703 | + if( !m_pendingOperations.insert( opts.token ).second ) | |
673 | 704 | { |
674 | - // ("ClientPaho", "%1 subscribe - token %2 already in use", m_clientId, opts.token); | |
705 | + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) ); | |
675 | 706 | } |
676 | - m_operationResult.erase(opts.token); | |
677 | - if (m_subscribeTokenToTopic.count(opts.token) > 0) | |
707 | + m_operationResult.erase( opts.token ); | |
708 | + if( m_subscribeTokenToTopic.count( opts.token ) > 0 ) | |
678 | 709 | { |
679 | - // ("ClientPaho", "%1 - overwriting pending subscription on topic %2 with topic %3", m_clientId, m_subscribeTokenToTopic[opts.token], topic); | |
710 | + LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) ); | |
680 | 711 | } |
681 | 712 | m_subscribeTokenToTopic[opts.token] = topic; |
682 | 713 | return opts.token; |
683 | 714 | } |
684 | 715 | |
685 | -void ClientPaho::setConnectionStatus(ConnectionStatus status) | |
716 | +void ClientPaho::setConnectionStatus( ConnectionStatus status ) | |
686 | 717 | { |
687 | 718 | ConnectionStatus curStatus = m_connectionStatus; |
688 | 719 | m_connectionStatus = status; |
689 | - if (status != curStatus && m_connectionStatusCallback) | |
720 | + if( status != curStatus && m_connectionStatusCallback ) | |
690 | 721 | { |
691 | - m_connectionStatusCallback(m_clientId, status); | |
722 | + m_connectionStatusCallback( m_clientId, status ); | |
692 | 723 | } |
693 | 724 | } |
694 | 725 | |
695 | -bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& existingTopic) const | |
726 | +bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const | |
696 | 727 | { |
697 | 728 | existingTopic.clear(); |
698 | - for (const auto& s : m_pendingSubscriptions) | |
729 | + for( const auto& s : m_pendingSubscriptions ) | |
699 | 730 | { |
700 | - if (testForOverlap(s.first, topic)) | |
731 | + if( testForOverlap( s.first, topic ) ) | |
701 | 732 | { |
702 | 733 | existingTopic = s.first; |
703 | 734 | return true; |
704 | 735 | } |
705 | 736 | } |
706 | 737 | |
707 | - for (const auto& s : m_subscriptions) | |
738 | + for( const auto& s : m_subscriptions ) | |
708 | 739 | { |
709 | - if (testForOverlap(s.first, topic)) | |
740 | + if( testForOverlap( s.first, topic ) ) | |
710 | 741 | { |
711 | 742 | existingTopic = s.first; |
712 | 743 | return true; |
... | ... | @@ -715,87 +746,86 @@ bool ClientPaho::isOverlappingInternal(const std::string& topic, std::string& ex |
715 | 746 | return false; |
716 | 747 | } |
717 | 748 | |
718 | -void ClientPaho::pushIncomingEvent(std::function<void()> ev) | |
749 | +void ClientPaho::pushIncomingEvent( std::function<void()> ev ) | |
719 | 750 | { |
720 | - m_callbackEventQueue.push(ev); | |
751 | + m_callbackEventQueue.push( ev ); | |
721 | 752 | } |
722 | 753 | |
723 | 754 | void ClientPaho::callbackEventHandler() |
724 | 755 | { |
725 | - // ("ClientPaho", "%1 - starting callback event handler", m_clientId); | |
726 | - for (;;) { | |
756 | + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) ); | |
757 | + for( ;; ) | |
758 | + { | |
727 | 759 | std::vector<std::function<void()>> events; |
728 | - if (!m_callbackEventQueue.pop(events)) | |
760 | + if( !m_callbackEventQueue.pop(events) ) | |
729 | 761 | { |
730 | 762 | break; |
731 | 763 | } |
732 | 764 | |
733 | - for (const auto& ev : events) | |
765 | + for( const auto& ev : events ) | |
734 | 766 | { |
735 | 767 | ev(); |
736 | - // ("ClientPaho", "%1 - Exception occurred: %2", m_clientId, mlogicException); | |
737 | 768 | } |
738 | 769 | } |
739 | - // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); | |
770 | + LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) ); | |
740 | 771 | } |
741 | - | |
742 | -void ClientPaho::onConnectOnInstance(const std::string& cause) | |
772 | +void ClientPaho::onConnectOnInstance( const std::string& cause ) | |
743 | 773 | { |
744 | - (void)cause; | |
745 | - // toLogFile ("ClientPaho", "onConnectOnInstance %1 - reconnected (cause %2)", m_clientId, cause); | |
774 | + (void) cause; | |
746 | 775 | { |
747 | 776 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
748 | - std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end())); | |
777 | + std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) ); | |
749 | 778 | m_subscriptions.clear(); |
750 | 779 | m_processPendingPublishes = true; // all publishes are on hold until publishPending is called. |
751 | 780 | } |
752 | 781 | |
753 | - setConnectionStatus(ConnectionStatus::Connected); | |
782 | + setConnectionStatus( ConnectionStatus::Connected ); | |
754 | 783 | } |
755 | 784 | |
756 | -void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) | |
785 | +void ClientPaho::onConnectSuccessOnInstance() | |
757 | 786 | { |
758 | - auto connectData = response.connectionData(); | |
759 | - // ("ClientPaho", "onConnectSuccessOnInstance %1 - connected to endpoint %2 (mqtt version %3, session present %4)", | |
760 | - // m_clientId, connectData.serverUri(), connectData.mqttVersion(), connectData.sessionPresent()); | |
761 | 787 | { |
762 | 788 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
763 | 789 | // Register the connect callback that is used in reconnect scenarios. |
764 | - auto rc = MQTTAsync_setConnected(m_client, this, &ClientPaho::onConnect); | |
765 | - if (MQTTASYNC_SUCCESS != rc) | |
790 | + auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect ); | |
791 | + if( MQTTASYNC_SUCCESS != rc ) | |
766 | 792 | { |
767 | - // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the connected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); | |
793 | + LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); | |
768 | 794 | } |
795 | + | |
769 | 796 | // For MQTTV5 |
770 | 797 | //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); |
771 | 798 | //if (MQTTASYNC_SUCCESS != rc) { |
772 | 799 | // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); |
773 | 800 | //} |
774 | 801 | // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); |
802 | + | |
775 | 803 | m_operationResult[-100] = true; |
776 | 804 | m_pendingOperations.erase(-100); |
777 | 805 | } |
778 | - setConnectionStatus(ConnectionStatus::Connected); | |
779 | - if (m_connectPromise) | |
806 | + | |
807 | + setConnectionStatus( ConnectionStatus::Connected ); | |
808 | + if( m_connectPromise ) | |
780 | 809 | { |
810 | + LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) ); | |
781 | 811 | m_connectPromise->set_value(); |
782 | 812 | } |
783 | 813 | m_operationsCompleteCV.notify_all(); |
784 | 814 | } |
785 | 815 | |
786 | -void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) | |
816 | +void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response ) | |
787 | 817 | { |
788 | - (void)response; | |
789 | - // ("ClientPaho", "onConnectFailureOnInstance %1 - connection failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); | |
818 | + (void) response; | |
819 | + LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")")); | |
790 | 820 | { |
791 | 821 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
792 | 822 | // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); |
793 | 823 | m_operationResult[-100] = false; |
794 | 824 | m_pendingOperations.erase(-100); |
795 | 825 | } |
796 | - if (ConnectionStatus::ConnectInProgress == m_connectionStatus) | |
826 | + if( ConnectionStatus::ConnectInProgress == m_connectionStatus ) | |
797 | 827 | { |
798 | - setConnectionStatus(ConnectionStatus::Disconnected); | |
828 | + setConnectionStatus( ConnectionStatus::Disconnected ); | |
799 | 829 | } |
800 | 830 | m_operationsCompleteCV.notify_all(); |
801 | 831 | } |
... | ... | @@ -805,9 +835,9 @@ void ClientPaho::onConnectFailureOnInstance(const MqttFailure& response) |
805 | 835 | // MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode)); |
806 | 836 | //} |
807 | 837 | |
808 | -void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) | |
838 | +void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& ) | |
809 | 839 | { |
810 | - // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - disconnected from endpoint %2", m_clientId, m_endpoint); | |
840 | + LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) ); | |
811 | 841 | { |
812 | 842 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
813 | 843 | m_subscriptions.clear(); |
... | ... | @@ -820,45 +850,46 @@ void ClientPaho::onDisconnectSuccessOnInstance(const MqttSuccess&) |
820 | 850 | m_pendingOperations.clear(); |
821 | 851 | } |
822 | 852 | |
823 | - setConnectionStatus(ConnectionStatus::Disconnected); | |
853 | + setConnectionStatus( ConnectionStatus::Disconnected ); | |
824 | 854 | |
825 | - if (m_disconnectPromise) { | |
855 | + if( m_disconnectPromise ) | |
856 | + { | |
826 | 857 | m_disconnectPromise->set_value(); |
827 | 858 | } |
828 | 859 | m_operationsCompleteCV.notify_all(); |
829 | 860 | } |
830 | 861 | |
831 | -void ClientPaho::onDisconnectFailureOnInstance(const MqttFailure& response) | |
862 | +void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response ) | |
832 | 863 | { |
833 | - (void)response; | |
834 | - // ("ClientPaho", "onDisconnectFailureOnInstance %1 - disconnect failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); | |
864 | + (void) response; | |
865 | + LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) ); | |
835 | 866 | { |
836 | - OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | |
867 | + OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | |
837 | 868 | // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations); |
838 | 869 | m_operationResult[-200] = false; |
839 | 870 | m_pendingOperations.erase(-200); |
840 | 871 | } |
841 | 872 | |
842 | - if (MQTTAsync_isConnected(m_client)) | |
873 | + if( MQTTAsync_isConnected( m_client ) ) | |
843 | 874 | { |
844 | - setConnectionStatus(ConnectionStatus::Connected); | |
875 | + setConnectionStatus( ConnectionStatus::Connected ); | |
845 | 876 | } |
846 | 877 | else |
847 | 878 | { |
848 | - setConnectionStatus(ConnectionStatus::Disconnected); | |
879 | + setConnectionStatus( ConnectionStatus::Disconnected ); | |
849 | 880 | } |
850 | 881 | |
851 | - if (m_disconnectPromise) | |
882 | + if( m_disconnectPromise ) | |
852 | 883 | { |
853 | 884 | m_disconnectPromise->set_value(); |
854 | 885 | } |
855 | 886 | m_operationsCompleteCV.notify_all(); |
856 | 887 | } |
857 | 888 | |
858 | -void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) | |
889 | +void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response ) | |
859 | 890 | { |
860 | 891 | auto pd = response.publishData(); |
861 | - // ("ClientPaho", "onPublishSuccessOnInstance %1 - publish with token %2 succeeded (message was %3)", m_clientId, response.token(), pd.payload()); | |
892 | + LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) ); | |
862 | 893 | { |
863 | 894 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
864 | 895 | // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); |
... | ... | @@ -868,9 +899,9 @@ void ClientPaho::onPublishSuccessOnInstance(const MqttSuccess& response) |
868 | 899 | m_operationsCompleteCV.notify_all(); |
869 | 900 | } |
870 | 901 | |
871 | -void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) | |
902 | +void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response ) | |
872 | 903 | { |
873 | - // ("ClientPaho", "onPublishFailureOnInstance %1 - publish with token %2 failed with code %3 (%4)", m_clientId, response.token(), response.codeToString(), response.message()); | |
904 | + LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); | |
874 | 905 | { |
875 | 906 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
876 | 907 | // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); |
... | ... | @@ -880,78 +911,82 @@ void ClientPaho::onPublishFailureOnInstance(const MqttFailure& response) |
880 | 911 | m_operationsCompleteCV.notify_all(); |
881 | 912 | } |
882 | 913 | |
883 | -void ClientPaho::onSubscribeSuccessOnInstance(const MqttSuccess& response) | |
914 | +void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response ) | |
884 | 915 | { |
885 | - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscribe with token %2 succeeded", m_clientId, response.token()); | |
886 | - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); | |
916 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) ); | |
917 | + | |
918 | + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); | |
887 | 919 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
888 | 920 | bool operationOk = false; |
889 | - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response, &operationOk]() | |
921 | + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]() | |
890 | 922 | { |
891 | 923 | // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); |
892 | 924 | m_operationResult[response.token()] = operationOk; |
893 | - m_pendingOperations.erase(response.token()); | |
925 | + m_pendingOperations.erase( response.token() ); | |
894 | 926 | }); |
895 | - auto it = m_subscribeTokenToTopic.find(response.token()); | |
896 | - if (m_subscribeTokenToTopic.end() == it) { | |
897 | - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, response.token()); | |
927 | + auto it = m_subscribeTokenToTopic.find( response.token() ); | |
928 | + if( m_subscribeTokenToTopic.end() == it ) | |
929 | + { | |
930 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); | |
898 | 931 | return; |
899 | 932 | } |
900 | 933 | auto topic = it->second; |
901 | 934 | m_subscribeTokenToTopic.erase(it); |
902 | 935 | |
903 | - auto pendingIt = m_pendingSubscriptions.find(topic); | |
904 | - if (m_pendingSubscriptions.end() == pendingIt) | |
936 | + auto pendingIt = m_pendingSubscriptions.find( topic ); | |
937 | + if( m_pendingSubscriptions.end() == pendingIt ) | |
905 | 938 | { |
906 | - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); | |
939 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); | |
907 | 940 | return; |
908 | 941 | } |
909 | - if (response.qos() != pendingIt->second.qos) | |
942 | + if( response.qos() != pendingIt->second.qos ) | |
910 | 943 | { |
911 | - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - subscription requested qos %2, endpoint assigned qos %3", m_clientId, pendingIt->second.qos, response.qos()); | |
944 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) ); | |
912 | 945 | } |
913 | - // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - move pending subscription on topic %2 to the registered subscriptions", m_clientId, topic); | |
914 | - m_subscriptions.emplace(std::make_pair(pendingIt->first, std::move(pendingIt->second))); | |
915 | - m_pendingSubscriptions.erase(pendingIt); | |
946 | + | |
947 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) ); | |
948 | + m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) ); | |
949 | + m_pendingSubscriptions.erase( pendingIt ); | |
916 | 950 | operationOk = true; |
917 | 951 | } |
918 | 952 | |
919 | -void ClientPaho::onSubscribeFailureOnInstance(const MqttFailure& response) | |
953 | +void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response ) | |
920 | 954 | { |
921 | - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); | |
922 | - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); | |
955 | + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); | |
956 | + | |
957 | + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); | |
923 | 958 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
924 | - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() | |
959 | + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() | |
925 | 960 | { |
926 | 961 | // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); |
927 | 962 | m_operationResult[response.token()] = false; |
928 | - m_pendingOperations.erase(response.token()); | |
963 | + m_pendingOperations.erase( response.token() ); | |
929 | 964 | }); |
930 | 965 | |
931 | - auto it = m_subscribeTokenToTopic.find(response.token()); | |
932 | - if (m_subscribeTokenToTopic.end() == it) | |
966 | + auto it = m_subscribeTokenToTopic.find( response.token() ); | |
967 | + if( m_subscribeTokenToTopic.end() == it ) | |
933 | 968 | { |
934 | - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); | |
969 | + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); | |
935 | 970 | return; |
936 | 971 | } |
937 | 972 | auto topic = it->second; |
938 | - m_subscribeTokenToTopic.erase(it); | |
973 | + m_subscribeTokenToTopic.erase( it ); | |
939 | 974 | |
940 | - auto pendingIt = m_pendingSubscriptions.find(topic); | |
941 | - if (m_pendingSubscriptions.end() == pendingIt) | |
975 | + auto pendingIt = m_pendingSubscriptions.find( topic ); | |
976 | + if( m_pendingSubscriptions.end() == pendingIt ) | |
942 | 977 | { |
943 | - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - cannot find pending subscription for token %2", m_clientId, response.token()); | |
978 | + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) ); | |
944 | 979 | return; |
945 | 980 | } |
946 | - // ("ClientPaho", "onSubscribeFailureOnInstance %1 - remove pending subscription on topic %2", m_clientId, topic); | |
947 | - m_pendingSubscriptions.erase(pendingIt); | |
981 | + LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) ); | |
982 | + m_pendingSubscriptions.erase( pendingIt ); | |
948 | 983 | } |
949 | 984 | |
950 | -void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) | |
985 | +void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response ) | |
951 | 986 | { |
952 | - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unsubscribe with token %2 succeeded", m_clientId, response.token()); | |
987 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) ); | |
953 | 988 | |
954 | - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); | |
989 | + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); | |
955 | 990 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
956 | 991 | |
957 | 992 | // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token. |
... | ... | @@ -960,116 +995,117 @@ void ClientPaho::onUnsubscribeSuccessOnInstance(const MqttSuccess& response) |
960 | 995 | // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled |
961 | 996 | // sequentially (see ClientPaho::unsubscribe)! |
962 | 997 | auto token = response.token(); |
963 | - if (-1 == token) | |
998 | + if( -1 == token ) | |
964 | 999 | { |
965 | 1000 | token = m_lastUnsubscribe; |
966 | 1001 | m_lastUnsubscribe = -1; |
967 | 1002 | } |
968 | 1003 | |
969 | 1004 | bool operationOk = false; |
970 | - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, token, &operationOk]() | |
1005 | + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]() | |
971 | 1006 | { |
972 | 1007 | // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token); |
973 | 1008 | m_operationResult[token] = operationOk; |
974 | - m_pendingOperations.erase(token); | |
1009 | + m_pendingOperations.erase( token ); | |
975 | 1010 | }); |
976 | 1011 | |
977 | - auto it = m_unsubscribeTokenToTopic.find(token); | |
978 | - if (m_unsubscribeTokenToTopic.end() == it) | |
1012 | + auto it = m_unsubscribeTokenToTopic.find( token ); | |
1013 | + if( m_unsubscribeTokenToTopic.end() == it ) | |
979 | 1014 | { |
980 | - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - unknown token %2", m_clientId, token); | |
1015 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) ); | |
981 | 1016 | return; |
982 | 1017 | } |
983 | 1018 | auto topic = it->second; |
984 | - m_unsubscribeTokenToTopic.erase(it); | |
1019 | + m_unsubscribeTokenToTopic.erase( it ); | |
985 | 1020 | |
986 | - auto registeredIt = m_subscriptions.find(topic); | |
987 | - if (m_subscriptions.end() == registeredIt) { | |
988 | - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - cannot find subscription for token %2", m_clientId, response.token()); | |
1021 | + auto registeredIt = m_subscriptions.find( topic ); | |
1022 | + if( m_subscriptions.end() == registeredIt ) | |
1023 | + { | |
1024 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) ); | |
989 | 1025 | return; |
990 | 1026 | } |
991 | - // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - remove subscription on topic %2 from the registered subscriptions", m_clientId, topic); | |
992 | - m_subscriptions.erase(registeredIt); | |
1027 | + | |
1028 | + LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) ); | |
1029 | + m_subscriptions.erase( registeredIt ); | |
993 | 1030 | operationOk = true; |
994 | 1031 | } |
995 | 1032 | |
996 | -void ClientPaho::onUnsubscribeFailureOnInstance(const MqttFailure& response) | |
1033 | +void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response ) | |
997 | 1034 | { |
998 | - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - subscription failed with code %2 (%3)", m_clientId, response.codeToString(), response.message()); | |
999 | - OSDEV_COMPONENTS_SCOPEGUARD(m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); }); | |
1035 | + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) ); | |
1036 | + OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } ); | |
1000 | 1037 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
1001 | - OSDEV_COMPONENTS_SCOPEGUARD(m_pendingOperations, [this, &response]() | |
1038 | + OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]() | |
1002 | 1039 | { |
1003 | 1040 | // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token()); |
1004 | 1041 | m_operationResult[response.token()] = false; |
1005 | - m_pendingOperations.erase(response.token()); | |
1042 | + m_pendingOperations.erase( response.token() ); | |
1006 | 1043 | }); |
1007 | 1044 | |
1008 | - auto it = m_unsubscribeTokenToTopic.find(response.token()); | |
1009 | - if (m_unsubscribeTokenToTopic.end() == it) | |
1045 | + auto it = m_unsubscribeTokenToTopic.find( response.token() ); | |
1046 | + if( m_unsubscribeTokenToTopic.end() == it ) | |
1010 | 1047 | { |
1011 | - // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - unknown token %2", m_clientId, response.token()); | |
1048 | + LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) ); | |
1012 | 1049 | return; |
1013 | 1050 | } |
1014 | 1051 | auto topic = it->second; |
1015 | - m_unsubscribeTokenToTopic.erase(it); | |
1052 | + m_unsubscribeTokenToTopic.erase( it ); | |
1016 | 1053 | } |
1017 | 1054 | |
1018 | -int ClientPaho::onMessageArrivedOnInstance(const MqttMessage& message) | |
1055 | +int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message ) | |
1019 | 1056 | { |
1020 | - // ("ClientPaho", "onMessageArrivedOnInstance %1 - received message on topic %2, retained : %3, dup : %4", m_clientId, message.topic(), message.retained(), message.duplicate()); | |
1021 | - | |
1022 | - std::function<void(MqttMessage)> cb; | |
1057 | + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) ); | |
1023 | 1058 | |
1059 | + std::function<void( MqttMessage )> cb; | |
1024 | 1060 | { |
1025 | 1061 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
1026 | - for (const auto& s : m_subscriptions) | |
1062 | + for( const auto& s : m_subscriptions ) | |
1027 | 1063 | { |
1028 | - if (boost::regex_match(message.topic(), s.second.topicRegex)) | |
1064 | + if( boost::regex_match( message.topic(), s.second.topicRegex ) ) | |
1029 | 1065 | { |
1030 | 1066 | cb = s.second.callback; |
1031 | 1067 | } |
1032 | 1068 | } |
1033 | 1069 | } |
1034 | 1070 | |
1035 | - if (cb) | |
1071 | + if( cb ) | |
1036 | 1072 | { |
1037 | - cb(message); | |
1073 | + cb( message ); | |
1038 | 1074 | } |
1039 | 1075 | else |
1040 | 1076 | { |
1041 | - // ("ClientPaho", "onMessageArrivedOnInstance %1 - no topic filter found for message received on topic %2", m_clientId, message.topic()); | |
1077 | + LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) ); | |
1042 | 1078 | } |
1043 | 1079 | return 1; |
1044 | 1080 | } |
1045 | 1081 | |
1046 | -void ClientPaho::onDeliveryCompleteOnInstance(MQTTAsync_token token) | |
1082 | +void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token ) | |
1047 | 1083 | { |
1048 | - // ("ClientPaho", "onDeliveryCompleteOnInstance %1 - message with token %2 is delivered", m_clientId, token); | |
1049 | - if (m_deliveryCompleteCallback) | |
1084 | + LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) ); | |
1085 | + if( m_deliveryCompleteCallback ) | |
1050 | 1086 | { |
1051 | - m_deliveryCompleteCallback(m_clientId, static_cast<std::int32_t>(token)); | |
1087 | + m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) ); | |
1052 | 1088 | } |
1053 | 1089 | } |
1054 | 1090 | |
1055 | -void ClientPaho::onConnectionLostOnInstance(const std::string& cause) | |
1091 | +void ClientPaho::onConnectionLostOnInstance( const std::string& cause ) | |
1056 | 1092 | { |
1057 | - (void)cause; | |
1093 | + (void) cause; | |
1058 | 1094 | // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause); |
1059 | - setConnectionStatus(ConnectionStatus::ReconnectInProgress); | |
1095 | + setConnectionStatus( ConnectionStatus::ReconnectInProgress ); | |
1060 | 1096 | |
1061 | 1097 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
1062 | 1098 | // Remove all tokens related to subscriptions from the active operations. |
1063 | - for (const auto& p : m_subscribeTokenToTopic) | |
1099 | + for( const auto& p : m_subscribeTokenToTopic ) | |
1064 | 1100 | { |
1065 | 1101 | // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); |
1066 | - m_pendingOperations.erase(p.first); | |
1102 | + m_pendingOperations.erase( p.first ); | |
1067 | 1103 | } |
1068 | 1104 | |
1069 | - for (const auto& p : m_unsubscribeTokenToTopic) | |
1105 | + for( const auto& p : m_unsubscribeTokenToTopic ) | |
1070 | 1106 | { |
1071 | 1107 | // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first); |
1072 | - m_pendingOperations.erase(p.first); | |
1108 | + m_pendingOperations.erase( p.first ); | |
1073 | 1109 | } |
1074 | 1110 | // Clear the administration used in the subscribe process. |
1075 | 1111 | m_subscribeTokenToTopic.clear(); |
... | ... | @@ -1077,39 +1113,55 @@ void ClientPaho::onConnectionLostOnInstance(const std::string& cause) |
1077 | 1113 | } |
1078 | 1114 | |
1079 | 1115 | // static |
1080 | -void ClientPaho::onConnect(void* context, char* cause) | |
1116 | +void ClientPaho::onFirstConnect( void* context, char* cause ) | |
1081 | 1117 | { |
1082 | - if (context) | |
1118 | + LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." ); | |
1119 | + if( context ) | |
1083 | 1120 | { |
1084 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1085 | - std::string reason(nullptr == cause ? "unknown cause" : cause); | |
1086 | - cl->pushIncomingEvent([cl, reason]() { cl->onConnectOnInstance(reason); }); | |
1121 | + auto *cl = reinterpret_cast<ClientPaho*>( context ); | |
1122 | + std::string reason( nullptr == cause ? "Unknown cause" : cause ); | |
1123 | + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } ); | |
1124 | + } | |
1125 | +} | |
1126 | + | |
1127 | +void ClientPaho::onConnect( void* context, char* cause ) | |
1128 | +{ | |
1129 | + LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." ); | |
1130 | + if( context ) | |
1131 | + { | |
1132 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1133 | + std::string reason( nullptr == cause ? "unknown cause" : cause ); | |
1134 | + cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } ); | |
1087 | 1135 | } |
1088 | 1136 | } |
1089 | 1137 | |
1090 | 1138 | // static |
1091 | -void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) | |
1139 | +void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response ) | |
1092 | 1140 | { |
1093 | - if (context) | |
1141 | + LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." ); | |
1142 | + if( context ) | |
1094 | 1143 | { |
1095 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1096 | - if (!response) { | |
1144 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1145 | + if( !response ) | |
1146 | + { | |
1097 | 1147 | // connect should always have a valid response struct. |
1098 | - // ("ClientPaho", "onConnectSuccess - no response data"); | |
1148 | + LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" ); | |
1149 | + return; | |
1099 | 1150 | } |
1100 | - MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); | |
1101 | - cl->pushIncomingEvent([cl, resp]() { cl->onConnectSuccessOnInstance(resp); }); | |
1151 | + // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent)); | |
1152 | + cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } ); | |
1102 | 1153 | } |
1103 | 1154 | } |
1104 | 1155 | |
1105 | 1156 | // static |
1106 | -void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) | |
1157 | +void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response ) | |
1107 | 1158 | { |
1108 | - if (context) | |
1159 | + LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) ); | |
1160 | + if( context ) | |
1109 | 1161 | { |
1110 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1111 | - MqttFailure resp(response); | |
1112 | - cl->pushIncomingEvent([cl, resp]() { cl->onConnectFailureOnInstance(resp); }); | |
1162 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1163 | + MqttFailure resp( response ); | |
1164 | + cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } ); | |
1113 | 1165 | } |
1114 | 1166 | } |
1115 | 1167 | |
... | ... | @@ -1134,34 +1186,35 @@ void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response |
1134 | 1186 | //} |
1135 | 1187 | |
1136 | 1188 | // static |
1137 | -void ClientPaho::onDisconnectSuccess(void* context, MQTTAsync_successData* response) | |
1189 | +void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response ) | |
1138 | 1190 | { |
1139 | - if (context) | |
1191 | + if( context ) | |
1140 | 1192 | { |
1141 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1142 | - MqttSuccess resp(response ? response->token : 0); | |
1143 | - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectSuccessOnInstance(resp); }); | |
1193 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1194 | + MqttSuccess resp( response ? response->token : 0 ); | |
1195 | + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } ); | |
1144 | 1196 | } |
1145 | 1197 | } |
1146 | 1198 | |
1147 | 1199 | // static |
1148 | -void ClientPaho::onDisconnectFailure(void* context, MQTTAsync_failureData* response) | |
1200 | +void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response ) | |
1149 | 1201 | { |
1150 | - if (context) | |
1202 | + LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." ); | |
1203 | + if( context ) | |
1151 | 1204 | { |
1152 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1153 | - MqttFailure resp(response); | |
1154 | - cl->pushIncomingEvent([cl, resp]() { cl->onDisconnectFailureOnInstance(resp); }); | |
1205 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1206 | + MqttFailure resp( response ); | |
1207 | + cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } ); | |
1155 | 1208 | } |
1156 | 1209 | } |
1157 | 1210 | |
1158 | 1211 | // static |
1159 | -void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response) | |
1212 | +void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response ) | |
1160 | 1213 | { |
1161 | - if (context) | |
1214 | + if( context ) | |
1162 | 1215 | { |
1163 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1164 | - if (!response) | |
1216 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1217 | + if( !response ) | |
1165 | 1218 | { |
1166 | 1219 | // publish should always have a valid response struct. |
1167 | 1220 | // toLogFile ("ClientPaho", "onPublishSuccess - no response data"); |
... | ... | @@ -1172,134 +1225,137 @@ void ClientPaho::onPublishSuccess(void* context, MQTTAsync_successData* response |
1172 | 1225 | } |
1173 | 1226 | |
1174 | 1227 | // static |
1175 | -void ClientPaho::onPublishFailure(void* context, MQTTAsync_failureData* response) | |
1228 | +void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response ) | |
1176 | 1229 | { |
1177 | - (void)response; | |
1178 | - if (context) | |
1230 | + (void) response; | |
1231 | + if( context ) | |
1179 | 1232 | { |
1180 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1181 | - MqttFailure resp(response); | |
1182 | - cl->pushIncomingEvent([cl, resp]() { cl->onPublishFailureOnInstance(resp); }); | |
1233 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1234 | + MqttFailure resp( response ); | |
1235 | + cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } ); | |
1183 | 1236 | } |
1184 | 1237 | } |
1185 | 1238 | |
1186 | 1239 | // static |
1187 | -void ClientPaho::onSubscribeSuccess(void* context, MQTTAsync_successData* response) | |
1240 | +void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response ) | |
1188 | 1241 | { |
1189 | - if (context) | |
1242 | + if( context ) | |
1190 | 1243 | { |
1191 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1192 | - if (!response) | |
1244 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1245 | + if( !response ) | |
1193 | 1246 | { |
1194 | 1247 | // subscribe should always have a valid response struct. |
1195 | 1248 | // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data"); |
1196 | 1249 | } |
1197 | - MqttSuccess resp(response->token, response->alt.qos); | |
1198 | - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeSuccessOnInstance(resp); }); | |
1250 | + MqttSuccess resp( response->token, response->alt.qos ); | |
1251 | + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } ); | |
1199 | 1252 | } |
1200 | 1253 | } |
1201 | 1254 | |
1202 | 1255 | // static |
1203 | -void ClientPaho::onSubscribeFailure(void* context, MQTTAsync_failureData* response) | |
1256 | +void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response ) | |
1204 | 1257 | { |
1205 | - if (context) | |
1258 | + if( context ) | |
1206 | 1259 | { |
1207 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1208 | - MqttFailure resp(response); | |
1209 | - cl->pushIncomingEvent([cl, resp]() { cl->onSubscribeFailureOnInstance(resp); }); | |
1260 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1261 | + MqttFailure resp( response ); | |
1262 | + cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } ); | |
1210 | 1263 | } |
1211 | 1264 | } |
1212 | 1265 | |
1213 | 1266 | // static |
1214 | -void ClientPaho::onUnsubscribeSuccess(void* context, MQTTAsync_successData* response) | |
1267 | +void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response ) | |
1215 | 1268 | { |
1216 | - if (context) | |
1269 | + if( context ) | |
1217 | 1270 | { |
1218 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1219 | - MqttSuccess resp(response ? response->token : -1); | |
1220 | - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeSuccessOnInstance(resp); }); | |
1271 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1272 | + MqttSuccess resp( response ? response->token : -1 ); | |
1273 | + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } ); | |
1221 | 1274 | } |
1222 | 1275 | } |
1223 | 1276 | |
1224 | 1277 | // static |
1225 | -void ClientPaho::onUnsubscribeFailure(void* context, MQTTAsync_failureData* response) | |
1278 | +void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response ) | |
1226 | 1279 | { |
1227 | - if (context) | |
1280 | + if( context ) | |
1228 | 1281 | { |
1229 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1230 | - MqttFailure resp(response); | |
1231 | - cl->pushIncomingEvent([cl, resp]() { cl->onUnsubscribeFailureOnInstance(resp); }); | |
1282 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1283 | + MqttFailure resp( response ); | |
1284 | + cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } ); | |
1232 | 1285 | } |
1233 | 1286 | } |
1234 | 1287 | |
1235 | 1288 | // static |
1236 | -int ClientPaho::onMessageArrived(void* context, char* topicName, int, MQTTAsync_message* message) | |
1289 | +int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message ) | |
1237 | 1290 | { |
1238 | 1291 | |
1239 | - OSDEV_COMPONENTS_SCOPEGUARD(freeMessage, [&topicName, &message]() | |
1292 | + OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]() | |
1240 | 1293 | { |
1241 | - MQTTAsync_freeMessage(&message); | |
1242 | - MQTTAsync_free(topicName); | |
1294 | + MQTTAsync_freeMessage( &message ); | |
1295 | + MQTTAsync_free( topicName ); | |
1243 | 1296 | }); |
1244 | 1297 | |
1245 | - if (context) | |
1298 | + if( context ) | |
1246 | 1299 | { |
1247 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1248 | - MqttMessage msg(topicName, *message); | |
1249 | - cl->pushIncomingEvent([cl, msg]() { cl->onMessageArrivedOnInstance(msg); }); | |
1300 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1301 | + MqttMessage msg( topicName, *message ); | |
1302 | + cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } ); | |
1250 | 1303 | } |
1251 | 1304 | |
1252 | 1305 | return 1; // always return true. Otherwise this callback is triggered again. |
1253 | 1306 | } |
1254 | 1307 | |
1255 | 1308 | // static |
1256 | -void ClientPaho::onDeliveryComplete(void* context, MQTTAsync_token token) | |
1309 | +void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token ) | |
1257 | 1310 | { |
1258 | - if (context) | |
1311 | + if( context ) | |
1259 | 1312 | { |
1260 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1261 | - cl->pushIncomingEvent([cl, token]() { cl->onDeliveryCompleteOnInstance(token); }); | |
1313 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1314 | + cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } ); | |
1262 | 1315 | } |
1263 | 1316 | } |
1264 | 1317 | |
1265 | 1318 | // static |
1266 | -void ClientPaho::onConnectionLost(void* context, char* cause) | |
1319 | +void ClientPaho::onConnectionLost( void* context, char* cause ) | |
1267 | 1320 | { |
1268 | - OSDEV_COMPONENTS_SCOPEGUARD(freeCause, [&cause]() | |
1321 | + OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]() | |
1269 | 1322 | { |
1270 | - if (cause) | |
1323 | + if( cause ) | |
1271 | 1324 | { |
1272 | - MQTTAsync_free(cause); | |
1325 | + MQTTAsync_free( cause ); | |
1273 | 1326 | } |
1274 | 1327 | }); |
1275 | 1328 | |
1276 | - if (context) | |
1329 | + if( context ) | |
1277 | 1330 | { |
1278 | - auto* cl = reinterpret_cast<ClientPaho*>(context); | |
1279 | - std::string msg(nullptr == cause ? "cause unknown" : cause); | |
1280 | - cl->pushIncomingEvent([cl, msg]() { cl->onConnectionLostOnInstance(msg); }); | |
1331 | + auto* cl = reinterpret_cast<ClientPaho*>( context ); | |
1332 | + std::string msg( nullptr == cause ? "cause unknown" : cause ); | |
1333 | + cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } ); | |
1281 | 1334 | } |
1282 | 1335 | } |
1283 | 1336 | |
1284 | 1337 | // static |
1285 | -void ClientPaho::onLogPaho(enum MQTTASYNC_TRACE_LEVELS level, char* message) | |
1338 | +void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message ) | |
1286 | 1339 | { |
1287 | - (void)message; | |
1288 | - switch (level) | |
1340 | + (void) message; | |
1341 | + switch( level ) | |
1289 | 1342 | { |
1290 | 1343 | case MQTTASYNC_TRACE_MAXIMUM: |
1291 | 1344 | case MQTTASYNC_TRACE_MEDIUM: |
1292 | - case MQTTASYNC_TRACE_MINIMUM: { | |
1345 | + case MQTTASYNC_TRACE_MINIMUM: | |
1346 | + { | |
1293 | 1347 | // ("ClientPaho", "paho - %1", message) |
1294 | 1348 | break; |
1295 | 1349 | } |
1296 | - case MQTTASYNC_TRACE_PROTOCOL: { | |
1350 | + case MQTTASYNC_TRACE_PROTOCOL: | |
1351 | + { | |
1297 | 1352 | // ("ClientPaho", "paho - %1", message) |
1298 | 1353 | break; |
1299 | 1354 | } |
1300 | 1355 | case MQTTASYNC_TRACE_ERROR: |
1301 | 1356 | case MQTTASYNC_TRACE_SEVERE: |
1302 | - case MQTTASYNC_TRACE_FATAL: { | |
1357 | + case MQTTASYNC_TRACE_FATAL: | |
1358 | + { | |
1303 | 1359 | // ("ClientPaho", "paho - %1", message) |
1304 | 1360 | break; |
1305 | 1361 | } | ... | ... |
src/connectionstatus.cpp
... | ... | @@ -38,5 +38,5 @@ std::ostream& operator<<(std::ostream &os, ConnectionStatus rhs) |
38 | 38 | case ConnectionStatus::Connected: |
39 | 39 | return os << "Connected"; |
40 | 40 | } |
41 | - return os << "Unknown"; // Should never be reached. | |
41 | + return os << "Unknown"; // "Should" never be reached. ( But guess what? ) | |
42 | 42 | } | ... | ... |
src/log.cpp
... | ... | @@ -53,7 +53,7 @@ int toInt( LogLevel level ) |
53 | 53 | |
54 | 54 | std::string Log::s_context = std::string(); |
55 | 55 | std::string Log::s_fileName = std::string(); |
56 | -LogLevel Log::s_logLevel = LogLevel::Error; | |
56 | +LogLevel Log::s_logLevel = LogLevel::Debug; | |
57 | 57 | LogMask Log::s_logMask = LogMask::None; |
58 | 58 | |
59 | 59 | void Log::init( const std::string& context, const std::string& logFile, LogLevel logDepth ) | ... | ... |
src/mqttclient.cpp
... | ... | @@ -125,7 +125,7 @@ StateEnum MqttClient::state() const |
125 | 125 | return m_serverState.state(); |
126 | 126 | } |
127 | 127 | |
128 | -void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt ) | |
128 | +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt, bool blocking ) | |
129 | 129 | { |
130 | 130 | osdev::components::mqtt::ParsedUri _endpoint = { |
131 | 131 | { "scheme", "tcp" }, |
... | ... | @@ -135,10 +135,10 @@ void MqttClient::connect(const std::string& host, int port, const Credentials &c |
135 | 135 | { "port", std::to_string(port) } |
136 | 136 | }; |
137 | 137 | |
138 | - this->connect( UriParser::toString( _endpoint ), lwt ); | |
138 | + this->connect( UriParser::toString( _endpoint ), lwt, blocking ); | |
139 | 139 | } |
140 | 140 | |
141 | -void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) | |
141 | +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt, bool blocking ) | |
142 | 142 | { |
143 | 143 | LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); |
144 | 144 | |
... | ... | @@ -171,7 +171,7 @@ void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) |
171 | 171 | client = m_principalClient.get(); |
172 | 172 | } |
173 | 173 | |
174 | - client->connect( true, lwt ); | |
174 | + client->connect( blocking, lwt ); | |
175 | 175 | } |
176 | 176 | |
177 | 177 | void MqttClient::disconnect() |
... | ... | @@ -254,12 +254,15 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi |
254 | 254 | // throw (?)(MqttException, "Not connected"); |
255 | 255 | return Token(m_clientId, -1); |
256 | 256 | } |
257 | - if (!m_principalClient->isOverlapping(topic)) { | |
257 | + if (!m_principalClient->isOverlapping(topic)) | |
258 | + { | |
258 | 259 | client = m_principalClient.get(); |
259 | 260 | clientFound = true; |
260 | 261 | } |
261 | - else { | |
262 | - for (const auto& c : m_additionalClients) { | |
262 | + else | |
263 | + { | |
264 | + for (const auto& c : m_additionalClients) | |
265 | + { | |
263 | 266 | if (!c->isOverlapping(topic)) { |
264 | 267 | client = c.get(); |
265 | 268 | clientFound = true; |
... | ... | @@ -267,7 +270,8 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi |
267 | 270 | } |
268 | 271 | } |
269 | 272 | } |
270 | - if (!clientFound) { | |
273 | + if (!clientFound) | |
274 | + { | |
271 | 275 | LogDebug("[MqttClient::subscribe]", std::string( m_clientId + " - Creating new ClientPaho instance for subscription on topic " + topic ) ); |
272 | 276 | std::string derivedClientId(generateUniqueClientId(m_clientId, m_additionalClients.size() + 2)); // principal client is nr 1. |
273 | 277 | m_additionalClients.emplace_back(std::make_unique<ClientPaho>( |
... | ... | @@ -278,7 +282,8 @@ Token MqttClient::subscribe(const std::string& topic, int qos, const std::functi |
278 | 282 | client = m_additionalClients.back().get(); |
279 | 283 | } |
280 | 284 | } |
281 | - if (!clientFound) { | |
285 | + if (!clientFound) | |
286 | + { | |
282 | 287 | client->connect(true); |
283 | 288 | } |
284 | 289 | return Token{ client->clientId(), client->subscribe(topic, qos, cb) }; | ... | ... |