Blame view

src/clientpaho.cpp 48.6 KB
b5d9e433   Peter M. Groen   Fixed License Hea...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  /* ****************************************************************************
   * Copyright 2019 Open Systems Development BV                                 *
   *                                                                            *
   * Permission is hereby granted, free of charge, to any person obtaining a    *
   * copy of this software and associated documentation files (the "Software"), *
   * to deal in the Software without restriction, including without limitation  *
   * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
   * and/or sell copies of the Software, and to permit persons to whom the      *
   * Software is furnished to do so, subject to the following conditions:       *
   *                                                                            *
   * The above copyright notice and this permission notice shall be included in *
   * all copies or substantial portions of the Software.                        *
   *                                                                            *
   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
   * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
   * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
   * DEALINGS IN THE SOFTWARE.                                                  *
   * ***************************************************************************/
51becbde   Peter M. Groen   Committed the ent...
22
23
24
25
26
  #include "clientpaho.h"
  
  #include "errorcode.h"
  #include "mqttutil.h"
  #include "lockguard.h"
9421324b   Peter M. Groen   First fix on conn...
27
  #include "log.h"
51becbde   Peter M. Groen   Committed the ent...
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
  #include "metaprogrammingdefs.h"
  #include "mqttstream.h"
  #include "scopeguard.h"
  #include "uriparser.h"
  
  // std::chrono
  #include "compat-chrono.h"
  
  // std
  #include <algorithm>
  #include <iterator>
  
  using namespace osdev::components::mqtt;
  
  namespace {
  
  #if defined(__clang__)
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wunused-template"
  #endif
  
  OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)
  
  template <typename TRet>
  inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
      return MQTTAsync_disconnectOptions_initializer;
  }
  
  template <typename TRet>
  inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
  // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
  // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
  // method should not be chosen when the struct does not contain member onSuccess5!
  // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
  // while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
  // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
  // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
  #ifndef MQTTAsync_disconnectOptions_initializer5
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
      TRet ret = MQTTAsync_disconnectOptions_initializer;
      ret.struct_version = 1;
      ret.onSuccess5 = nullptr;
      ret.onFailure5 = nullptr;
      return ret;
  #pragma GCC diagnostic pop
  #else
      return MQTTAsync_disconnectOptions_initializer5;
  #endif
  }
  
  template <typename TRet>
  struct Init
  {
      static TRet initialize()
      {
          return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
      }
  };
  #if defined(__clang__)
  #pragma GCC diagnostic pop
  #endif
  
  } // namespace
  
  std::atomic_int ClientPaho::s_numberOfInstances(0);
  
ca0cf29e   Steven   syntax fixes, con...
97
  ClientPaho::ClientPaho( const std::string& _endpoint,
51becbde   Peter M. Groen   Committed the ent...
98
      const std::string& _id,
ca0cf29e   Steven   syntax fixes, con...
99
100
      const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
      const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
      : m_mutex()
      , m_endpoint()
      , m_username()
      , m_password()
      , m_clientId(_id)
      , m_pendingOperations()
      , m_operationResult()
      , m_operationsCompleteCV()
      , m_subscriptions()
      , m_pendingSubscriptions()
      , m_subscribeTokenToTopic()
      , m_unsubscribeTokenToTopic()
      , m_pendingPublishes()
      , m_processPendingPublishes(false)
      , m_pendingPublishesReadyCV()
      , m_client()
      , m_connectionStatus(ConnectionStatus::Disconnected)
      , m_connectionStatusCallback(connectionStatusCallback)
      , m_deliveryCompleteCallback(deliveryCompleteCallback)
      , m_lastUnsubscribe(-1)
      , m_connectPromise()
      , m_disconnectPromise()
      , m_callbackEventQueue(m_clientId)
      , m_workerThread()
  {
ca0cf29e   Steven   syntax fixes, con...
126
      if( 0 == s_numberOfInstances++ )
76d01373   Peter M. Groen   Fix on connection
127
      {
ca0cf29e   Steven   syntax fixes, con...
128
          MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
51becbde   Peter M. Groen   Committed the ent...
129
      }
76d01373   Peter M. Groen   Fix on connection
130
  
ca0cf29e   Steven   syntax fixes, con...
131
      LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
51becbde   Peter M. Groen   Committed the ent...
132
      parseEndpoint(_endpoint);
76d01373   Peter M. Groen   Fix on connection
133
  
ca0cf29e   Steven   syntax fixes, con...
134
135
      auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
      if( MQTTASYNC_SUCCESS == rc )
51becbde   Peter M. Groen   Committed the ent...
136
      {
ca0cf29e   Steven   syntax fixes, con...
137
138
          MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
          m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
51becbde   Peter M. Groen   Committed the ent...
139
140
141
      }
      else
      {
76d01373   Peter M. Groen   Fix on connection
142
          LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
143
144
145
146
147
      }
  }
  
  ClientPaho::~ClientPaho()
  {
ca0cf29e   Steven   syntax fixes, con...
148
      LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
51becbde   Peter M. Groen   Committed the ent...
149
150
151
152
      if( MQTTAsync_isConnected( m_client ) )
      {
          this->unsubscribeAll();
  
ca0cf29e   Steven   syntax fixes, con...
153
154
          this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
          this->disconnect( true, 5000 );
51becbde   Peter M. Groen   Committed the ent...
155
156
157
158
      }
      else
      {
          // If the status was already disconnected this call does nothing
ca0cf29e   Steven   syntax fixes, con...
159
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
160
161
      }
  
ca0cf29e   Steven   syntax fixes, con...
162
      if( 0 == --s_numberOfInstances )
51becbde   Peter M. Groen   Committed the ent...
163
164
165
166
167
      {
          // encountered a case where termination of the logging system within paho led to a segfault.
          // This was a paho thread that was cleaned while at the same time the logging system was terminated.
          // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
          // frequently.
ca0cf29e   Steven   syntax fixes, con...
168
          MQTTAsync_setTraceCallback( nullptr );
51becbde   Peter M. Groen   Committed the ent...
169
170
      }
  
ca0cf29e   Steven   syntax fixes, con...
171
      MQTTAsync_destroy( &m_client );
51becbde   Peter M. Groen   Committed the ent...
172
173
  
      m_callbackEventQueue.stop();
ca0cf29e   Steven   syntax fixes, con...
174
      if( m_workerThread.joinable() )
51becbde   Peter M. Groen   Committed the ent...
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
      {
          m_workerThread.join();
      }
  }
  
  std::string ClientPaho::clientId() const
  {
      return m_clientId;
  }
  
  ConnectionStatus ClientPaho::connectionStatus() const
  {
      return m_connectionStatus;
  }
  
31eece9b   Steven   added LWT ( last ...
190
  std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
51becbde   Peter M. Groen   Committed the ent...
191
192
193
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
2b967ea7   Steven   promise is not wo...
194
          if( ConnectionStatus::Disconnected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
195
196
197
          {
              return -1;
          }
2b967ea7   Steven   promise is not wo...
198
          setConnectionStatus( ConnectionStatus::ConnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
199
200
      }
  
76d01373   Peter M. Groen   Fix on connection
201
202
      LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
  
51becbde   Peter M. Groen   Committed the ent...
203
      MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
9421324b   Peter M. Groen   First fix on conn...
204
      conn_opts.keepAliveInterval = 5;
51becbde   Peter M. Groen   Committed the ent...
205
      conn_opts.cleansession = 1;
2c0c99a5   Peter M. Groen   Fix connect Callb...
206
      conn_opts.onSuccess = nullptr;
51becbde   Peter M. Groen   Committed the ent...
207
208
209
      conn_opts.onFailure = &ClientPaho::onConnectFailure;
      conn_opts.context = this;
      conn_opts.automaticReconnect = 1;
31eece9b   Steven   added LWT ( last ...
210
  
76d01373   Peter M. Groen   Fix on connection
211
212
213
214
      // Make sure we get a signal if the promise is fulfilled
      auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
      if( MQTTASYNC_SUCCESS == ccb )
      {
ca0cf29e   Steven   syntax fixes, con...
215
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
76d01373   Peter M. Groen   Fix on connection
216
217
218
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
219
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
76d01373   Peter M. Groen   Fix on connection
220
221
222
      }
  
      // Setup the last will and testament, if so desired.
31eece9b   Steven   added LWT ( last ...
223
224
225
226
227
228
229
      if( !lwt.topic().empty() )
      {
          MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
          will_opts.message = lwt.message().c_str();
          will_opts.topicName = lwt.topic().c_str();
  
          conn_opts.will = &will_opts;
76d01373   Peter M. Groen   Fix on connection
230
231
  
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
31eece9b   Steven   added LWT ( last ...
232
233
234
235
236
237
      }
      else
      {
          conn_opts.will = nullptr;
      }
  
2b967ea7   Steven   promise is not wo...
238
      if( !m_username.empty() )
51becbde   Peter M. Groen   Committed the ent...
239
240
241
242
      {
          conn_opts.username = m_username.c_str();
      }
  
2b967ea7   Steven   promise is not wo...
243
      if( !m_password.empty() )
51becbde   Peter M. Groen   Committed the ent...
244
245
246
247
248
249
250
      {
          conn_opts.password = m_password.c_str();
      }
  
      std::promise<void> waitForConnectPromise{};
      auto waitForConnect = waitForConnectPromise.get_future();
      m_connectPromise.reset();
2b967ea7   Steven   promise is not wo...
251
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
252
      {
2b967ea7   Steven   promise is not wo...
253
          m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
51becbde   Peter M. Groen   Committed the ent...
254
255
256
      }
  
      {
2b967ea7   Steven   promise is not wo...
257
258
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
          if( !m_pendingOperations.insert( -100 ).second )
51becbde   Peter M. Groen   Committed the ent...
259
260
261
          {
              // Write something
          }
2b967ea7   Steven   promise is not wo...
262
          m_operationResult.erase( -100 );
51becbde   Peter M. Groen   Committed the ent...
263
264
      }
  
2b967ea7   Steven   promise is not wo...
265
266
      int rc = MQTTAsync_connect( m_client, &conn_opts );
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
267
      {
2b967ea7   Steven   promise is not wo...
268
269
          setConnectionStatus( ConnectionStatus::Disconnected );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
270
271
272
273
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
  
2b967ea7   Steven   promise is not wo...
274
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
275
276
277
278
279
280
281
      {
          waitForConnect.get();
          m_connectPromise.reset();
      }
      return -100;
  }
  
0c424e03   Steven   syntax fixes. WIP
282
  std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
51becbde   Peter M. Groen   Committed the ent...
283
284
285
286
287
  {
      ConnectionStatus currentStatus = m_connectionStatus;
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
288
289
          if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
          {
51becbde   Peter M. Groen   Committed the ent...
290
291
292
293
              return -1;
          }
  
          currentStatus = m_connectionStatus;
0c424e03   Steven   syntax fixes. WIP
294
          setConnectionStatus( ConnectionStatus::DisconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
295
296
297
298
299
300
301
302
303
304
305
      }
  
      MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
      disconn_opts.timeout = timeoutMs;
      disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
      disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
      disconn_opts.context = this;
  
      std::promise<void> waitForDisconnectPromise{};
      auto waitForDisconnect = waitForDisconnectPromise.get_future();
      m_disconnectPromise.reset();
0c424e03   Steven   syntax fixes. WIP
306
307
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
308
309
310
311
312
          m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
313
          if( !m_pendingOperations.insert( -200 ).second )
51becbde   Peter M. Groen   Committed the ent...
314
          {
ca0cf29e   Steven   syntax fixes, con...
315
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
316
          }
ca0cf29e   Steven   syntax fixes, con...
317
          m_operationResult.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
318
319
      }
  
ca0cf29e   Steven   syntax fixes, con...
320
      int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
0c424e03   Steven   syntax fixes. WIP
321
      if( MQTTASYNC_SUCCESS != rc )
76d01373   Peter M. Groen   Fix on connection
322
      {
0c424e03   Steven   syntax fixes. WIP
323
          if( MQTTASYNC_DISCONNECTED == rc )
76d01373   Peter M. Groen   Fix on connection
324
          {
51becbde   Peter M. Groen   Committed the ent...
325
326
327
              currentStatus = ConnectionStatus::Disconnected;
          }
  
0c424e03   Steven   syntax fixes. WIP
328
329
          setConnectionStatus( currentStatus );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
330
          m_operationResult[-200] = false;
ca0cf29e   Steven   syntax fixes, con...
331
          m_pendingOperations.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
332
  
0c424e03   Steven   syntax fixes. WIP
333
          if( MQTTASYNC_DISCONNECTED == rc )
2b967ea7   Steven   promise is not wo...
334
          {
51becbde   Peter M. Groen   Committed the ent...
335
336
              return -1;
          }
ca0cf29e   Steven   syntax fixes, con...
337
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
338
339
      }
  
2b967ea7   Steven   promise is not wo...
340
341
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
342
343
          if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
          {
ca0cf29e   Steven   syntax fixes, con...
344
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
51becbde   Peter M. Groen   Committed the ent...
345
346
347
348
349
350
351
          }
          waitForDisconnect.get();
          m_disconnectPromise.reset();
      }
      return -200;
  }
  
ca0cf29e   Steven   syntax fixes, con...
352
  std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
353
  {
0c424e03   Steven   syntax fixes. WIP
354
      if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
355
      {
ca0cf29e   Steven   syntax fixes, con...
356
          LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
357
358
          return -1;
      }
0c424e03   Steven   syntax fixes. WIP
359
      else if( ConnectionStatus::Disconnected == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
360
      {
ca0cf29e   Steven   syntax fixes, con...
361
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
2b967ea7   Steven   promise is not wo...
362
          connect( true );
51becbde   Peter M. Groen   Committed the ent...
363
364
      }
  
0c424e03   Steven   syntax fixes. WIP
365
      if( !isValidTopic(message.topic() ) )
51becbde   Peter M. Groen   Committed the ent...
366
      {
ca0cf29e   Steven   syntax fixes, con...
367
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
51becbde   Peter M. Groen   Committed the ent...
368
369
      }
  
0c424e03   Steven   syntax fixes. WIP
370
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
371
372
373
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
374
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
375
376
377
378
      {
          qos = 0;
      }
  
51becbde   Peter M. Groen   Committed the ent...
379
      std::unique_lock<std::mutex> lck(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
380
      if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes )
a670240b   Peter M. Groen   Fix on connection
381
      {
51becbde   Peter M. Groen   Committed the ent...
382
          m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
ca0cf29e   Steven   syntax fixes, con...
383
          if( ConnectionStatus::ReconnectInProgress == m_connectionStatus )
a670240b   Peter M. Groen   Fix on connection
384
          {
0c424e03   Steven   syntax fixes. WIP
385
              LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
ca0cf29e   Steven   syntax fixes, con...
386
              m_pendingPublishes.push_front( Publish{ qos, message } );
51becbde   Peter M. Groen   Committed the ent...
387
388
389
390
              return -1;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
391
      return publishInternal( message, qos );
51becbde   Peter M. Groen   Committed the ent...
392
393
394
395
396
397
  }
  
  void ClientPaho::publishPending()
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
398
          if( !m_processPendingPublishes )
2b967ea7   Steven   promise is not wo...
399
          {
51becbde   Peter M. Groen   Committed the ent...
400
401
402
403
              return;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
404
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
405
      {
a670240b   Peter M. Groen   Fix on connection
406
          LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
51becbde   Peter M. Groen   Committed the ent...
407
408
      }
  
0c424e03   Steven   syntax fixes. WIP
409
      while( !m_pendingPublishes.empty() )
51becbde   Peter M. Groen   Committed the ent...
410
411
      {
          const auto& pub = m_pendingPublishes.back();
ca0cf29e   Steven   syntax fixes, con...
412
          publishInternal( pub.data, pub.qos );
51becbde   Peter M. Groen   Committed the ent...
413
414
415
416
417
418
419
420
421
422
423
  
          m_pendingPublishes.pop_back();
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_processPendingPublishes = false;
      }
      m_pendingPublishesReadyCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
424
  std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
51becbde   Peter M. Groen   Committed the ent...
425
  {
0c424e03   Steven   syntax fixes. WIP
426
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
427
428
429
430
      {
          // MqttException, "Not connected"
      }
  
0c424e03   Steven   syntax fixes. WIP
431
      if( !isValidTopic( topic ) )
51becbde   Peter M. Groen   Committed the ent...
432
433
434
435
      {
          // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
      }
  
0c424e03   Steven   syntax fixes. WIP
436
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
437
438
439
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
440
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
441
442
443
444
445
446
447
448
      {
          qos = 0;
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
          auto itExisting = m_subscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
449
450
451
452
          if( m_subscriptions.end() != itExisting )
          {
              if( itExisting->second.qos == qos )
              {
51becbde   Peter M. Groen   Committed the ent...
453
454
455
456
457
458
                  return -1;
              }
              // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
          }
  
          auto itPending = m_pendingSubscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
459
460
461
462
463
464
465
          if( m_pendingSubscriptions.end() != itPending )
          {
              if( itPending->second.qos == qos )
              {
                  auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
                  if( m_subscribeTokenToTopic.end() != itToken )
                  {
51becbde   Peter M. Groen   Committed the ent...
466
467
                      return itToken->first;
                  }
0c424e03   Steven   syntax fixes. WIP
468
469
                  else
                  {
51becbde   Peter M. Groen   Committed the ent...
470
471
472
473
474
475
476
                      return -1;
                  }
              }
              // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
          }
  
          std::string existingTopic{};
0c424e03   Steven   syntax fixes. WIP
477
          if( isOverlappingInternal( topic, existingTopic ) )
51becbde   Peter M. Groen   Committed the ent...
478
479
480
481
          {
              // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
          }
  
ca0cf29e   Steven   syntax fixes, con...
482
483
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
          m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
51becbde   Peter M. Groen   Committed the ent...
484
      }
0c424e03   Steven   syntax fixes. WIP
485
      return subscribeInternal( topic, qos );
51becbde   Peter M. Groen   Committed the ent...
486
487
488
489
  }
  
  void ClientPaho::resubscribe()
  {
ca0cf29e   Steven   syntax fixes, con...
490
      decltype( m_pendingSubscriptions ) pendingSubscriptions{};
51becbde   Peter M. Groen   Committed the ent...
491
492
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
493
          std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
494
495
      }
  
0c424e03   Steven   syntax fixes. WIP
496
      for( const auto& s : pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
497
      {
0c424e03   Steven   syntax fixes. WIP
498
          subscribeInternal( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
499
500
501
      }
  }
  
31eece9b   Steven   added LWT ( last ...
502
  std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
503
504
  {
      {
0c424e03   Steven   syntax fixes. WIP
505
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
506
          bool found = false;
0c424e03   Steven   syntax fixes. WIP
507
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
508
          {
0c424e03   Steven   syntax fixes. WIP
509
              if( topic == s.first && qos == s.second.qos )
51becbde   Peter M. Groen   Committed the ent...
510
511
512
513
514
              {
                  found = true;
                  break;
              }
          }
0c424e03   Steven   syntax fixes. WIP
515
          if( !found )
51becbde   Peter M. Groen   Committed the ent...
516
517
518
519
520
521
522
523
524
525
526
527
528
529
          {
              return -1;
          }
      }
  
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
      opts.onFailure = &ClientPaho::onUnsubscribeFailure;
      opts.context = this;
  
      {
          // Need to lock the mutex because it is possible that the callback is faster than
          // the insertion of the token into the pending operations.
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
530
          auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
0c424e03   Steven   syntax fixes. WIP
531
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
532
          {
ca0cf29e   Steven   syntax fixes, con...
533
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
534
535
          }
  
0c424e03   Steven   syntax fixes. WIP
536
          if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
537
          {
ca0cf29e   Steven   syntax fixes, con...
538
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token )  + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
539
540
          }
  
0c424e03   Steven   syntax fixes. WIP
541
542
          m_operationResult.erase( opts.token );
          if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
543
          {
ca0cf29e   Steven   syntax fixes, con...
544
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic  " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
545
546
547
548
549
550
551
552
553
554
555
556
557
          }
          m_lastUnsubscribe = opts.token; // centos7 workaround
          m_unsubscribeTokenToTopic[opts.token] = topic;
      }
  
      // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
      this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });
  
      return opts.token;
  }
  
  void ClientPaho::unsubscribeAll()
  {
0c424e03   Steven   syntax fixes. WIP
558
      decltype( m_subscriptions ) subscriptions{};
51becbde   Peter M. Groen   Committed the ent...
559
560
561
562
563
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          subscriptions = m_subscriptions;
      }
  
0c424e03   Steven   syntax fixes. WIP
564
565
      for( const auto& s : subscriptions )
      {
ca0cf29e   Steven   syntax fixes, con...
566
          this->unsubscribe( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
567
568
569
570
571
      }
  }
  
  std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
  {
ca0cf29e   Steven   syntax fixes, con...
572
573
574
      if( waitFor <= std::chrono::milliseconds( 0 ) )
      {
          return std::chrono::milliseconds( 0 );
51becbde   Peter M. Groen   Committed the ent...
575
576
577
      }
      std::chrono::milliseconds timeElapsed{};
      {
ca0cf29e   Steven   syntax fixes, con...
578
          osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
51becbde   Peter M. Groen   Committed the ent...
579
          {
ca0cf29e   Steven   syntax fixes, con...
580
              timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
51becbde   Peter M. Groen   Committed the ent...
581
582
          });
          std::unique_lock<std::mutex> lck(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
583
  
51becbde   Peter M. Groen   Committed the ent...
584
585
586
          // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
          m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
          {
ca0cf29e   Steven   syntax fixes, con...
587
              if( tokens.empty() )
51becbde   Peter M. Groen   Committed the ent...
588
589
590
              { // wait for all operations to end
                  return m_pendingOperations.empty();
              }
ca0cf29e   Steven   syntax fixes, con...
591
              else if( tokens.size() == 1 )
51becbde   Peter M. Groen   Committed the ent...
592
              {
ca0cf29e   Steven   syntax fixes, con...
593
                  return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
51becbde   Peter M. Groen   Committed the ent...
594
595
596
597
598
599
600
601
602
              }
              std::vector<std::int32_t> intersect{};
              std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
              return intersect.empty();
          } );
      }
      return timeElapsed;
  }
  
ca0cf29e   Steven   syntax fixes, con...
603
  bool ClientPaho::isOverlapping( const std::string& topic ) const
51becbde   Peter M. Groen   Committed the ent...
604
605
  {
      std::string existingTopic{};
ca0cf29e   Steven   syntax fixes, con...
606
      return isOverlapping( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
607
608
  }
  
ca0cf29e   Steven   syntax fixes, con...
609
  bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
610
611
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
612
      return isOverlappingInternal( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
613
614
615
616
617
618
619
  }
  
  std::vector<std::int32_t> ClientPaho::pendingOperations() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      std::vector<std::int32_t> retval{};
      retval.resize(m_pendingOperations.size());
ca0cf29e   Steven   syntax fixes, con...
620
      std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
51becbde   Peter M. Groen   Committed the ent...
621
622
623
624
625
626
627
628
629
      return retval;
  }
  
  bool ClientPaho::hasPendingSubscriptions() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      return !m_pendingSubscriptions.empty();
  }
  
ca0cf29e   Steven   syntax fixes, con...
630
  boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
51becbde   Peter M. Groen   Committed the ent...
631
632
633
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      boost::optional<bool> ret{};
ca0cf29e   Steven   syntax fixes, con...
634
635
      auto cit = m_operationResult.find( token );
      if( m_operationResult.end() != cit )
51becbde   Peter M. Groen   Committed the ent...
636
637
638
639
640
641
      {
          ret = cit->second;
      }
      return ret;
  }
  
ca0cf29e   Steven   syntax fixes, con...
642
  void ClientPaho::parseEndpoint( const std::string& _endpoint )
51becbde   Peter M. Groen   Committed the ent...
643
  {
ca0cf29e   Steven   syntax fixes, con...
644
645
      auto ep = UriParser::parse( _endpoint );
      if( ep.find( "user" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
646
647
648
649
650
      {
          m_username = ep["user"];
          ep["user"].clear();
      }
  
ca0cf29e   Steven   syntax fixes, con...
651
      if( ep.find( "password" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
652
653
654
655
      {
          m_password = ep["password"];
          ep["password"].clear();
      }
ca0cf29e   Steven   syntax fixes, con...
656
      m_endpoint = UriParser::toString( ep );
51becbde   Peter M. Groen   Committed the ent...
657
658
  }
  
ca0cf29e   Steven   syntax fixes, con...
659
  std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
660
661
662
663
664
665
666
667
668
669
670
671
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onPublishSuccess;
      opts.onFailure = &ClientPaho::onPublishFailure;
      opts.context = this;
      auto msg = message.toAsyncMessage();
      msg.qos = qos;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
  
      // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
672
      auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
ca0cf29e   Steven   syntax fixes, con...
673
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
674
      {
ca0cf29e   Steven   syntax fixes, con...
675
          LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
676
677
      }
  
ca0cf29e   Steven   syntax fixes, con...
678
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
679
      {
ca0cf29e   Steven   syntax fixes, con...
680
          LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
681
      }
ca0cf29e   Steven   syntax fixes, con...
682
      m_operationResult.erase( opts.token );
51becbde   Peter M. Groen   Committed the ent...
683
684
685
      return opts.token;
  }
  
ca0cf29e   Steven   syntax fixes, con...
686
  std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
687
688
689
690
691
692
693
694
695
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onSubscribeSuccess;
      opts.onFailure = &ClientPaho::onSubscribeFailure;
      opts.context = this;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
696
      auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
51becbde   Peter M. Groen   Committed the ent...
697
698
      if (MQTTASYNC_SUCCESS != rc)
      {
ca0cf29e   Steven   syntax fixes, con...
699
700
          m_pendingSubscriptions.erase( topic );
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
701
702
      }
  
ca0cf29e   Steven   syntax fixes, con...
703
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
704
      {
ca0cf29e   Steven   syntax fixes, con...
705
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
706
      }
ca0cf29e   Steven   syntax fixes, con...
707
708
      m_operationResult.erase( opts.token );
      if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
709
      {
ca0cf29e   Steven   syntax fixes, con...
710
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
711
712
713
714
715
      }
      m_subscribeTokenToTopic[opts.token] = topic;
      return opts.token;
  }
  
0c424e03   Steven   syntax fixes. WIP
716
  void ClientPaho::setConnectionStatus( ConnectionStatus status )
51becbde   Peter M. Groen   Committed the ent...
717
718
719
  {
      ConnectionStatus curStatus = m_connectionStatus;
      m_connectionStatus = status;
0c424e03   Steven   syntax fixes. WIP
720
      if( status != curStatus && m_connectionStatusCallback )
51becbde   Peter M. Groen   Committed the ent...
721
      {
0c424e03   Steven   syntax fixes. WIP
722
          m_connectionStatusCallback( m_clientId, status );
51becbde   Peter M. Groen   Committed the ent...
723
724
725
      }
  }
  
0c424e03   Steven   syntax fixes. WIP
726
  bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
727
728
  {
      existingTopic.clear();
0c424e03   Steven   syntax fixes. WIP
729
      for( const auto& s : m_pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
730
      {
0c424e03   Steven   syntax fixes. WIP
731
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
732
733
734
735
736
737
          {
              existingTopic = s.first;
              return true;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
738
      for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
739
      {
ed30280f   Steven   last changes.
740
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
741
742
743
744
745
746
747
748
          {
              existingTopic = s.first;
              return true;
          }
      }
      return false;
  }
  
ed30280f   Steven   last changes.
749
  void ClientPaho::pushIncomingEvent( std::function<void()> ev )
51becbde   Peter M. Groen   Committed the ent...
750
  {
ed30280f   Steven   last changes.
751
      m_callbackEventQueue.push( ev );
51becbde   Peter M. Groen   Committed the ent...
752
753
754
755
  }
  
  void ClientPaho::callbackEventHandler()
  {
ca0cf29e   Steven   syntax fixes, con...
756
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
0c424e03   Steven   syntax fixes. WIP
757
758
      for( ;; )
      {
51becbde   Peter M. Groen   Committed the ent...
759
          std::vector<std::function<void()>> events;
0c424e03   Steven   syntax fixes. WIP
760
          if( !m_callbackEventQueue.pop(events) )
51becbde   Peter M. Groen   Committed the ent...
761
762
763
764
          {
              break;
          }
  
0c424e03   Steven   syntax fixes. WIP
765
          for( const auto& ev : events )
51becbde   Peter M. Groen   Committed the ent...
766
767
          {
              ev();
51becbde   Peter M. Groen   Committed the ent...
768
769
          }
      }
ed30280f   Steven   last changes.
770
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
51becbde   Peter M. Groen   Committed the ent...
771
  }
0c424e03   Steven   syntax fixes. WIP
772
  void ClientPaho::onConnectOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
773
  {
ca0cf29e   Steven   syntax fixes, con...
774
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
775
776
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
777
          std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
778
779
780
781
          m_subscriptions.clear();
          m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
      }
  
ed30280f   Steven   last changes.
782
      setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
783
784
  }
  
2c0c99a5   Peter M. Groen   Fix connect Callb...
785
  void ClientPaho::onConnectSuccessOnInstance()
51becbde   Peter M. Groen   Committed the ent...
786
  {
51becbde   Peter M. Groen   Committed the ent...
787
788
789
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // Register the connect callback that is used in reconnect scenarios.
0c424e03   Steven   syntax fixes. WIP
790
791
          auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
792
          {
0c424e03   Steven   syntax fixes. WIP
793
              LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
51becbde   Peter M. Groen   Committed the ent...
794
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
795
  
51becbde   Peter M. Groen   Committed the ent...
796
797
798
799
800
801
          // For MQTTV5
          //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
          //if (MQTTASYNC_SUCCESS != rc) {
          //    // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
          //}
          // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
2c0c99a5   Peter M. Groen   Fix connect Callb...
802
  
51becbde   Peter M. Groen   Committed the ent...
803
804
805
          m_operationResult[-100] = true;
          m_pendingOperations.erase(-100);
      }
9421324b   Peter M. Groen   First fix on conn...
806
  
0c424e03   Steven   syntax fixes. WIP
807
808
      setConnectionStatus( ConnectionStatus::Connected );
      if( m_connectPromise )
51becbde   Peter M. Groen   Committed the ent...
809
      {
ca0cf29e   Steven   syntax fixes, con...
810
          LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
51becbde   Peter M. Groen   Committed the ent...
811
812
813
814
815
          m_connectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
816
  void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
817
  {
0c424e03   Steven   syntax fixes. WIP
818
819
      (void) response;
      LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
51becbde   Peter M. Groen   Committed the ent...
820
821
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
51becbde   Peter M. Groen   Committed the ent...
822
823
824
825
          // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
ca0cf29e   Steven   syntax fixes, con...
826
      if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
827
      {
ca0cf29e   Steven   syntax fixes, con...
828
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
829
830
831
832
833
834
835
836
837
      }
      m_operationsCompleteCV.notify_all();
  }
  
  //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
  //{
  //    MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
  //}
  
ca0cf29e   Steven   syntax fixes, con...
838
  void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
51becbde   Peter M. Groen   Committed the ent...
839
  {
ca0cf29e   Steven   syntax fixes, con...
840
      LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
51becbde   Peter M. Groen   Committed the ent...
841
842
843
844
845
846
847
848
849
850
851
852
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_subscriptions.clear();
          m_pendingSubscriptions.clear();
          m_subscribeTokenToTopic.clear();
          m_unsubscribeTokenToTopic.clear();
  
          // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
          m_operationResult[-200] = true;
          m_pendingOperations.clear();
      }
  
0c424e03   Steven   syntax fixes. WIP
853
      setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
854
  
0c424e03   Steven   syntax fixes. WIP
855
856
      if( m_disconnectPromise )
      {
51becbde   Peter M. Groen   Committed the ent...
857
858
859
860
861
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
862
  void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
863
  {
0c424e03   Steven   syntax fixes. WIP
864
      (void) response;
ca0cf29e   Steven   syntax fixes, con...
865
      LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
51becbde   Peter M. Groen   Committed the ent...
866
      {
ca0cf29e   Steven   syntax fixes, con...
867
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);        
51becbde   Peter M. Groen   Committed the ent...
868
869
870
871
872
          // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
          m_operationResult[-200] = false;
          m_pendingOperations.erase(-200);
      }
  
0c424e03   Steven   syntax fixes. WIP
873
      if( MQTTAsync_isConnected( m_client ) )
51becbde   Peter M. Groen   Committed the ent...
874
      {
0c424e03   Steven   syntax fixes. WIP
875
          setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
876
877
878
      }
      else
      {
0c424e03   Steven   syntax fixes. WIP
879
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
880
881
      }
  
0c424e03   Steven   syntax fixes. WIP
882
      if( m_disconnectPromise )
51becbde   Peter M. Groen   Committed the ent...
883
884
885
886
887
888
      {
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
889
  void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
890
891
  {
      auto pd = response.publishData();
ca0cf29e   Steven   syntax fixes, con...
892
      LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
893
894
895
896
897
898
899
900
901
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = true;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
902
  void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
903
  {
ca0cf29e   Steven   syntax fixes, con...
904
      LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
905
906
907
908
909
910
911
912
913
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
914
  void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
915
  {
ca0cf29e   Steven   syntax fixes, con...
916
917
918
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
51becbde   Peter M. Groen   Committed the ent...
919
920
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
921
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
922
923
924
      {
          // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
925
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
926
      });
ca0cf29e   Steven   syntax fixes, con...
927
928
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
0c424e03   Steven   syntax fixes. WIP
929
      {
ca0cf29e   Steven   syntax fixes, con...
930
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
931
932
933
934
935
          return;
      }
      auto topic = it->second;
      m_subscribeTokenToTopic.erase(it);
  
ca0cf29e   Steven   syntax fixes, con...
936
937
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
938
      {
ca0cf29e   Steven   syntax fixes, con...
939
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
940
941
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
942
      if( response.qos() != pendingIt->second.qos )
51becbde   Peter M. Groen   Committed the ent...
943
      {
ca0cf29e   Steven   syntax fixes, con...
944
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
51becbde   Peter M. Groen   Committed the ent...
945
      }
ca0cf29e   Steven   syntax fixes, con...
946
947
948
949
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
      m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
950
951
952
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
953
  void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
954
  {
ca0cf29e   Steven   syntax fixes, con...
955
956
957
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
958
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
959
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
960
961
962
      {
          // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
963
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
964
965
      });
  
ca0cf29e   Steven   syntax fixes, con...
966
967
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
968
      {
ca0cf29e   Steven   syntax fixes, con...
969
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
970
971
972
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
973
      m_subscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
974
  
ca0cf29e   Steven   syntax fixes, con...
975
976
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
977
      {
ca0cf29e   Steven   syntax fixes, con...
978
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
979
980
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
981
982
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
983
984
  }
  
ca0cf29e   Steven   syntax fixes, con...
985
  void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
986
  {
ca0cf29e   Steven   syntax fixes, con...
987
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
51becbde   Peter M. Groen   Committed the ent...
988
  
ca0cf29e   Steven   syntax fixes, con...
989
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
990
991
992
993
994
995
996
997
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
      // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
      // As a workaround the last unsubscribe token is stored and is used when no valid token is available.
      // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
      // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
      // sequentially (see ClientPaho::unsubscribe)!
      auto token = response.token();
ca0cf29e   Steven   syntax fixes, con...
998
      if( -1 == token )
51becbde   Peter M. Groen   Committed the ent...
999
1000
1001
1002
1003
1004
      {
          token = m_lastUnsubscribe;
          m_lastUnsubscribe = -1;
      }
  
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
1005
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
1006
1007
1008
      {
          // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
          m_operationResult[token] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
1009
          m_pendingOperations.erase( token );
51becbde   Peter M. Groen   Committed the ent...
1010
1011
      });
  
ca0cf29e   Steven   syntax fixes, con...
1012
1013
      auto it = m_unsubscribeTokenToTopic.find( token );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1014
      {
ca0cf29e   Steven   syntax fixes, con...
1015
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
51becbde   Peter M. Groen   Committed the ent...
1016
1017
1018
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1019
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1020
  
ca0cf29e   Steven   syntax fixes, con...
1021
1022
1023
1024
      auto registeredIt = m_subscriptions.find( topic );
      if( m_subscriptions.end() == registeredIt )
      {
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1025
1026
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
1027
1028
1029
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
      m_subscriptions.erase( registeredIt );
51becbde   Peter M. Groen   Committed the ent...
1030
1031
1032
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1033
  void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
1034
  {
ca0cf29e   Steven   syntax fixes, con...
1035
1036
      LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
1037
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1038
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
1039
1040
1041
      {
          // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
1042
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
1043
1044
      });
  
ca0cf29e   Steven   syntax fixes, con...
1045
1046
      auto it = m_unsubscribeTokenToTopic.find( response.token() );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1047
      {
ca0cf29e   Steven   syntax fixes, con...
1048
          LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1049
1050
1051
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1052
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1053
1054
  }
  
ca0cf29e   Steven   syntax fixes, con...
1055
  int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
51becbde   Peter M. Groen   Committed the ent...
1056
  {
ca0cf29e   Steven   syntax fixes, con...
1057
      LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1058
  
ca0cf29e   Steven   syntax fixes, con...
1059
      std::function<void( MqttMessage )> cb;
51becbde   Peter M. Groen   Committed the ent...
1060
1061
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1062
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
1063
          {
ca0cf29e   Steven   syntax fixes, con...
1064
              if( boost::regex_match( message.topic(), s.second.topicRegex ) )
51becbde   Peter M. Groen   Committed the ent...
1065
1066
1067
1068
1069
1070
              {
                  cb = s.second.callback;
              }
          }
      }
  
ca0cf29e   Steven   syntax fixes, con...
1071
      if( cb )
51becbde   Peter M. Groen   Committed the ent...
1072
      {
ca0cf29e   Steven   syntax fixes, con...
1073
          cb( message );
51becbde   Peter M. Groen   Committed the ent...
1074
1075
1076
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
1077
          LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
1078
1079
1080
1081
      }
      return 1;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1082
  void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1083
  {
ca0cf29e   Steven   syntax fixes, con...
1084
1085
      LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
      if( m_deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
1086
      {
ca0cf29e   Steven   syntax fixes, con...
1087
          m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
51becbde   Peter M. Groen   Committed the ent...
1088
1089
1090
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1091
  void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
1092
  {
ca0cf29e   Steven   syntax fixes, con...
1093
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
1094
      // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
ca0cf29e   Steven   syntax fixes, con...
1095
      setConnectionStatus( ConnectionStatus::ReconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
1096
1097
1098
  
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      // Remove all tokens related to subscriptions from the active operations.
ca0cf29e   Steven   syntax fixes, con...
1099
      for( const auto& p : m_subscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1100
1101
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1102
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1103
1104
      }
  
ca0cf29e   Steven   syntax fixes, con...
1105
      for( const auto& p : m_unsubscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1106
1107
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1108
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1109
1110
1111
1112
1113
1114
1115
      }
      // Clear the administration used in the subscribe process.
      m_subscribeTokenToTopic.clear();
      m_unsubscribeTokenToTopic.clear();
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1116
  void ClientPaho::onFirstConnect( void* context, char* cause )
9421324b   Peter M. Groen   First fix on conn...
1117
1118
  {
      LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1119
      if( context )
9421324b   Peter M. Groen   First fix on conn...
1120
      {
ca0cf29e   Steven   syntax fixes, con...
1121
1122
1123
          auto *cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "Unknown cause" : cause );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
9421324b   Peter M. Groen   First fix on conn...
1124
1125
1126
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1127
  void ClientPaho::onConnect( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1128
  {
9421324b   Peter M. Groen   First fix on conn...
1129
      LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1130
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1131
      {
ca0cf29e   Steven   syntax fixes, con...
1132
1133
1134
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "unknown cause" : cause );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
51becbde   Peter M. Groen   Committed the ent...
1135
1136
1137
1138
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1139
  void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1140
  {
2c0c99a5   Peter M. Groen   Fix connect Callb...
1141
      LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1142
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1143
      {
ca0cf29e   Steven   syntax fixes, con...
1144
1145
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
76d01373   Peter M. Groen   Fix on connection
1146
          {
51becbde   Peter M. Groen   Committed the ent...
1147
              // connect should always have a valid response struct.
ca0cf29e   Steven   syntax fixes, con...
1148
              LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
2c0c99a5   Peter M. Groen   Fix connect Callb...
1149
              return;
51becbde   Peter M. Groen   Committed the ent...
1150
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
1151
          // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
ca0cf29e   Steven   syntax fixes, con...
1152
          cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
51becbde   Peter M. Groen   Committed the ent...
1153
1154
1155
1156
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1157
  void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1158
  {
ca0cf29e   Steven   syntax fixes, con...
1159
1160
      LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1161
      {
ca0cf29e   Steven   syntax fixes, con...
1162
1163
1164
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
      }
  }
  
  //// static
  //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
  //{
  //    apply_unused_parameters(properties);
  //    try {
  //        if (context) {
  //            auto* cl = reinterpret_cast<ClientPaho*>(context);
  //            cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
  //        }
  //    }
  //    catch (...) {
  //    }
  //    catch (const std::exception& e) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
  //    }
  //    catch (...) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
  //    }
  //}
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1189
  void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1190
  {
ca0cf29e   Steven   syntax fixes, con...
1191
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1192
      {
ca0cf29e   Steven   syntax fixes, con...
1193
1194
1195
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : 0 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1196
1197
1198
1199
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1200
  void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1201
  {
9421324b   Peter M. Groen   First fix on conn...
1202
      LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1203
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1204
      {
ca0cf29e   Steven   syntax fixes, con...
1205
1206
1207
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1208
1209
1210
1211
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1212
  void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1213
  {
ca0cf29e   Steven   syntax fixes, con...
1214
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1215
      {
ca0cf29e   Steven   syntax fixes, con...
1216
1217
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
          {
              // publish should always have a valid response struct.
              // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
          }
          MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
          cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1228
  void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1229
  {
ca0cf29e   Steven   syntax fixes, con...
1230
1231
      (void) response;
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1232
      {
ca0cf29e   Steven   syntax fixes, con...
1233
1234
1235
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1236
1237
1238
1239
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1240
  void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1241
  {
ca0cf29e   Steven   syntax fixes, con...
1242
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1243
      {
ca0cf29e   Steven   syntax fixes, con...
1244
1245
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1246
1247
1248
1249
          {
              // subscribe should always have a valid response struct.
              // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
          }
ca0cf29e   Steven   syntax fixes, con...
1250
1251
          MqttSuccess resp( response->token, response->alt.qos );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1252
1253
1254
1255
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1256
  void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1257
  {
ca0cf29e   Steven   syntax fixes, con...
1258
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1259
      {
ca0cf29e   Steven   syntax fixes, con...
1260
1261
1262
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1263
1264
1265
1266
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1267
  void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1268
  {
ca0cf29e   Steven   syntax fixes, con...
1269
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1270
      {
ca0cf29e   Steven   syntax fixes, con...
1271
1272
1273
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : -1 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1274
1275
1276
1277
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1278
  void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1279
  {
ca0cf29e   Steven   syntax fixes, con...
1280
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1281
      {
ca0cf29e   Steven   syntax fixes, con...
1282
1283
1284
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1285
1286
1287
1288
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1289
  int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
51becbde   Peter M. Groen   Committed the ent...
1290
1291
  {
  
ca0cf29e   Steven   syntax fixes, con...
1292
      OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
51becbde   Peter M. Groen   Committed the ent...
1293
      {
ca0cf29e   Steven   syntax fixes, con...
1294
1295
          MQTTAsync_freeMessage( &message );
          MQTTAsync_free( topicName );
51becbde   Peter M. Groen   Committed the ent...
1296
1297
      });
  
ca0cf29e   Steven   syntax fixes, con...
1298
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1299
      {
ca0cf29e   Steven   syntax fixes, con...
1300
1301
1302
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttMessage msg( topicName, *message );
          cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1303
1304
1305
1306
1307
1308
      }
  
      return 1; // always return true. Otherwise this callback is triggered again.
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1309
  void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1310
  {
ca0cf29e   Steven   syntax fixes, con...
1311
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1312
      {
ca0cf29e   Steven   syntax fixes, con...
1313
1314
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
51becbde   Peter M. Groen   Committed the ent...
1315
1316
1317
1318
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1319
  void ClientPaho::onConnectionLost( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1320
  {
ca0cf29e   Steven   syntax fixes, con...
1321
      OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
51becbde   Peter M. Groen   Committed the ent...
1322
      {
ca0cf29e   Steven   syntax fixes, con...
1323
          if( cause )
51becbde   Peter M. Groen   Committed the ent...
1324
          {
ca0cf29e   Steven   syntax fixes, con...
1325
              MQTTAsync_free( cause );
51becbde   Peter M. Groen   Committed the ent...
1326
1327
1328
          }
      });
  
ca0cf29e   Steven   syntax fixes, con...
1329
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1330
      {
ca0cf29e   Steven   syntax fixes, con...
1331
1332
1333
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string msg( nullptr == cause ? "cause unknown" : cause );
          cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1334
1335
1336
1337
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1338
  void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
51becbde   Peter M. Groen   Committed the ent...
1339
  {
ca0cf29e   Steven   syntax fixes, con...
1340
1341
      (void) message;
      switch( level )
51becbde   Peter M. Groen   Committed the ent...
1342
1343
1344
      {
          case MQTTASYNC_TRACE_MAXIMUM:
          case MQTTASYNC_TRACE_MEDIUM:
ca0cf29e   Steven   syntax fixes, con...
1345
1346
          case MQTTASYNC_TRACE_MINIMUM:
          {
51becbde   Peter M. Groen   Committed the ent...
1347
1348
1349
              // ("ClientPaho", "paho - %1", message)
              break;
          }
ca0cf29e   Steven   syntax fixes, con...
1350
1351
          case MQTTASYNC_TRACE_PROTOCOL:
          {
51becbde   Peter M. Groen   Committed the ent...
1352
1353
1354
1355
1356
              // ("ClientPaho", "paho - %1", message)
              break;
          }
          case MQTTASYNC_TRACE_ERROR:
          case MQTTASYNC_TRACE_SEVERE:
ca0cf29e   Steven   syntax fixes, con...
1357
1358
          case MQTTASYNC_TRACE_FATAL:
          {
51becbde   Peter M. Groen   Committed the ent...
1359
1360
1361
1362
1363
              // ("ClientPaho", "paho - %1", message)
              break;
          }
      }
  }