Blame view

src/clientpaho.cpp 49 KB
b5d9e433   Peter M. Groen   Fixed License Hea...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  /* ****************************************************************************
   * Copyright 2019 Open Systems Development BV                                 *
   *                                                                            *
   * Permission is hereby granted, free of charge, to any person obtaining a    *
   * copy of this software and associated documentation files (the "Software"), *
   * to deal in the Software without restriction, including without limitation  *
   * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
   * and/or sell copies of the Software, and to permit persons to whom the      *
   * Software is furnished to do so, subject to the following conditions:       *
   *                                                                            *
   * The above copyright notice and this permission notice shall be included in *
   * all copies or substantial portions of the Software.                        *
   *                                                                            *
   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
   * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
   * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
   * DEALINGS IN THE SOFTWARE.                                                  *
   * ***************************************************************************/
51becbde   Peter M. Groen   Committed the ent...
22
23
24
25
26
  #include "clientpaho.h"
  
  #include "errorcode.h"
  #include "mqttutil.h"
  #include "lockguard.h"
9421324b   Peter M. Groen   First fix on conn...
27
  #include "log.h"
51becbde   Peter M. Groen   Committed the ent...
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
  #include "metaprogrammingdefs.h"
  #include "mqttstream.h"
  #include "scopeguard.h"
  #include "uriparser.h"
  
  // std::chrono
  #include "compat-chrono.h"
  
  // std
  #include <algorithm>
  #include <iterator>
  
  using namespace osdev::components::mqtt;
  
  namespace {
  
  #if defined(__clang__)
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wunused-template"
  #endif
  
  OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)
  
  template <typename TRet>
  inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
      return MQTTAsync_disconnectOptions_initializer;
  }
  
  template <typename TRet>
  inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
  // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
  // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
  // method should not be chosen when the struct does not contain member onSuccess5!
  // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
  // while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
  // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
  // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
  #ifndef MQTTAsync_disconnectOptions_initializer5
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
      TRet ret = MQTTAsync_disconnectOptions_initializer;
      ret.struct_version = 1;
      ret.onSuccess5 = nullptr;
      ret.onFailure5 = nullptr;
      return ret;
  #pragma GCC diagnostic pop
  #else
      return MQTTAsync_disconnectOptions_initializer5;
  #endif
  }
  
  template <typename TRet>
  struct Init
  {
      static TRet initialize()
      {
          return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
      }
  };
  #if defined(__clang__)
  #pragma GCC diagnostic pop
  #endif
  
  } // namespace
  
  std::atomic_int ClientPaho::s_numberOfInstances(0);
  
ca0cf29e   Steven   syntax fixes, con...
97
  ClientPaho::ClientPaho( const std::string& _endpoint,
51becbde   Peter M. Groen   Committed the ent...
98
      const std::string& _id,
ca0cf29e   Steven   syntax fixes, con...
99
100
      const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
      const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
      : m_mutex()
      , m_endpoint()
      , m_username()
      , m_password()
      , m_clientId(_id)
      , m_pendingOperations()
      , m_operationResult()
      , m_operationsCompleteCV()
      , m_subscriptions()
      , m_pendingSubscriptions()
      , m_subscribeTokenToTopic()
      , m_unsubscribeTokenToTopic()
      , m_pendingPublishes()
      , m_processPendingPublishes(false)
      , m_pendingPublishesReadyCV()
      , m_client()
      , m_connectionStatus(ConnectionStatus::Disconnected)
      , m_connectionStatusCallback(connectionStatusCallback)
      , m_deliveryCompleteCallback(deliveryCompleteCallback)
      , m_lastUnsubscribe(-1)
      , m_connectPromise()
      , m_disconnectPromise()
      , m_callbackEventQueue(m_clientId)
      , m_workerThread()
  {
ca0cf29e   Steven   syntax fixes, con...
126
      if( 0 == s_numberOfInstances++ )
76d01373   Peter M. Groen   Fix on connection
127
      {
ca0cf29e   Steven   syntax fixes, con...
128
          MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
51becbde   Peter M. Groen   Committed the ent...
129
      }
76d01373   Peter M. Groen   Fix on connection
130
  
ca0cf29e   Steven   syntax fixes, con...
131
      LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
51becbde   Peter M. Groen   Committed the ent...
132
      parseEndpoint(_endpoint);
76d01373   Peter M. Groen   Fix on connection
133
  
ca0cf29e   Steven   syntax fixes, con...
134
135
      auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
      if( MQTTASYNC_SUCCESS == rc )
51becbde   Peter M. Groen   Committed the ent...
136
      {
ca0cf29e   Steven   syntax fixes, con...
137
138
          MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
          m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
51becbde   Peter M. Groen   Committed the ent...
139
140
141
      }
      else
      {
76d01373   Peter M. Groen   Fix on connection
142
          LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
143
144
145
146
147
      }
  }
  
  ClientPaho::~ClientPaho()
  {
ca0cf29e   Steven   syntax fixes, con...
148
      LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
51becbde   Peter M. Groen   Committed the ent...
149
150
151
152
      if( MQTTAsync_isConnected( m_client ) )
      {
          this->unsubscribeAll();
  
ca0cf29e   Steven   syntax fixes, con...
153
154
          this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
          this->disconnect( true, 5000 );
51becbde   Peter M. Groen   Committed the ent...
155
156
157
158
      }
      else
      {
          // If the status was already disconnected this call does nothing
ca0cf29e   Steven   syntax fixes, con...
159
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
160
161
      }
  
ca0cf29e   Steven   syntax fixes, con...
162
      if( 0 == --s_numberOfInstances )
51becbde   Peter M. Groen   Committed the ent...
163
164
165
166
167
      {
          // encountered a case where termination of the logging system within paho led to a segfault.
          // This was a paho thread that was cleaned while at the same time the logging system was terminated.
          // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
          // frequently.
ca0cf29e   Steven   syntax fixes, con...
168
          MQTTAsync_setTraceCallback( nullptr );
51becbde   Peter M. Groen   Committed the ent...
169
170
      }
  
ca0cf29e   Steven   syntax fixes, con...
171
      MQTTAsync_destroy( &m_client );
51becbde   Peter M. Groen   Committed the ent...
172
173
  
      m_callbackEventQueue.stop();
ca0cf29e   Steven   syntax fixes, con...
174
      if( m_workerThread.joinable() )
51becbde   Peter M. Groen   Committed the ent...
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
      {
          m_workerThread.join();
      }
  }
  
  std::string ClientPaho::clientId() const
  {
      return m_clientId;
  }
  
  ConnectionStatus ClientPaho::connectionStatus() const
  {
      return m_connectionStatus;
  }
  
31eece9b   Steven   added LWT ( last ...
190
  std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
51becbde   Peter M. Groen   Committed the ent...
191
192
193
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
2b967ea7   Steven   promise is not wo...
194
          if( ConnectionStatus::Disconnected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
195
196
197
          {
              return -1;
          }
2b967ea7   Steven   promise is not wo...
198
          setConnectionStatus( ConnectionStatus::ConnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
199
200
      }
  
76d01373   Peter M. Groen   Fix on connection
201
202
      LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
  
51becbde   Peter M. Groen   Committed the ent...
203
      MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
9421324b   Peter M. Groen   First fix on conn...
204
      conn_opts.keepAliveInterval = 5;
51becbde   Peter M. Groen   Committed the ent...
205
      conn_opts.cleansession = 1;
2c0c99a5   Peter M. Groen   Fix connect Callb...
206
      conn_opts.onSuccess = nullptr;
51becbde   Peter M. Groen   Committed the ent...
207
208
209
      conn_opts.onFailure = &ClientPaho::onConnectFailure;
      conn_opts.context = this;
      conn_opts.automaticReconnect = 1;
31eece9b   Steven   added LWT ( last ...
210
  
76d01373   Peter M. Groen   Fix on connection
211
212
213
214
      // Make sure we get a signal if the promise is fulfilled
      auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
      if( MQTTASYNC_SUCCESS == ccb )
      {
ca0cf29e   Steven   syntax fixes, con...
215
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
76d01373   Peter M. Groen   Fix on connection
216
217
218
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
219
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
76d01373   Peter M. Groen   Fix on connection
220
221
222
      }
  
      // Setup the last will and testament, if so desired.
31eece9b   Steven   added LWT ( last ...
223
224
225
226
227
228
229
      if( !lwt.topic().empty() )
      {
          MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
          will_opts.message = lwt.message().c_str();
          will_opts.topicName = lwt.topic().c_str();
  
          conn_opts.will = &will_opts;
76d01373   Peter M. Groen   Fix on connection
230
231
  
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
31eece9b   Steven   added LWT ( last ...
232
233
234
235
236
237
      }
      else
      {
          conn_opts.will = nullptr;
      }
  
2b967ea7   Steven   promise is not wo...
238
      if( !m_username.empty() )
51becbde   Peter M. Groen   Committed the ent...
239
240
241
242
      {
          conn_opts.username = m_username.c_str();
      }
  
2b967ea7   Steven   promise is not wo...
243
      if( !m_password.empty() )
51becbde   Peter M. Groen   Committed the ent...
244
245
246
247
248
249
250
      {
          conn_opts.password = m_password.c_str();
      }
  
      std::promise<void> waitForConnectPromise{};
      auto waitForConnect = waitForConnectPromise.get_future();
      m_connectPromise.reset();
2b967ea7   Steven   promise is not wo...
251
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
252
      {
2b967ea7   Steven   promise is not wo...
253
          m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
51becbde   Peter M. Groen   Committed the ent...
254
255
256
      }
  
      {
2b967ea7   Steven   promise is not wo...
257
258
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
          if( !m_pendingOperations.insert( -100 ).second )
51becbde   Peter M. Groen   Committed the ent...
259
260
261
          {
              // Write something
          }
2b967ea7   Steven   promise is not wo...
262
          m_operationResult.erase( -100 );
51becbde   Peter M. Groen   Committed the ent...
263
264
      }
  
2b967ea7   Steven   promise is not wo...
265
266
      int rc = MQTTAsync_connect( m_client, &conn_opts );
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
267
      {
2b967ea7   Steven   promise is not wo...
268
269
          setConnectionStatus( ConnectionStatus::Disconnected );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
270
271
272
273
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
  
2b967ea7   Steven   promise is not wo...
274
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
275
276
277
278
279
280
281
      {
          waitForConnect.get();
          m_connectPromise.reset();
      }
      return -100;
  }
  
0c424e03   Steven   syntax fixes. WIP
282
  std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
51becbde   Peter M. Groen   Committed the ent...
283
284
285
286
287
  {
      ConnectionStatus currentStatus = m_connectionStatus;
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
288
289
          if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
          {
51becbde   Peter M. Groen   Committed the ent...
290
291
292
293
              return -1;
          }
  
          currentStatus = m_connectionStatus;
0c424e03   Steven   syntax fixes. WIP
294
          setConnectionStatus( ConnectionStatus::DisconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
295
296
297
298
299
300
301
302
303
304
305
      }
  
      MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
      disconn_opts.timeout = timeoutMs;
      disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
      disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
      disconn_opts.context = this;
  
      std::promise<void> waitForDisconnectPromise{};
      auto waitForDisconnect = waitForDisconnectPromise.get_future();
      m_disconnectPromise.reset();
0c424e03   Steven   syntax fixes. WIP
306
307
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
308
309
310
311
312
          m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
313
          if( !m_pendingOperations.insert( -200 ).second )
51becbde   Peter M. Groen   Committed the ent...
314
          {
ca0cf29e   Steven   syntax fixes, con...
315
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
316
          }
ca0cf29e   Steven   syntax fixes, con...
317
          m_operationResult.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
318
319
      }
  
ca0cf29e   Steven   syntax fixes, con...
320
      int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
0c424e03   Steven   syntax fixes. WIP
321
      if( MQTTASYNC_SUCCESS != rc )
76d01373   Peter M. Groen   Fix on connection
322
      {
0c424e03   Steven   syntax fixes. WIP
323
          if( MQTTASYNC_DISCONNECTED == rc )
76d01373   Peter M. Groen   Fix on connection
324
          {
51becbde   Peter M. Groen   Committed the ent...
325
326
327
              currentStatus = ConnectionStatus::Disconnected;
          }
  
0c424e03   Steven   syntax fixes. WIP
328
329
          setConnectionStatus( currentStatus );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
330
          m_operationResult[-200] = false;
ca0cf29e   Steven   syntax fixes, con...
331
          m_pendingOperations.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
332
  
0c424e03   Steven   syntax fixes. WIP
333
          if( MQTTASYNC_DISCONNECTED == rc )
2b967ea7   Steven   promise is not wo...
334
          {
51becbde   Peter M. Groen   Committed the ent...
335
336
              return -1;
          }
ca0cf29e   Steven   syntax fixes, con...
337
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
338
339
      }
  
2b967ea7   Steven   promise is not wo...
340
341
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
342
343
          if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
          {
ca0cf29e   Steven   syntax fixes, con...
344
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
51becbde   Peter M. Groen   Committed the ent...
345
346
347
348
349
350
351
          }
          waitForDisconnect.get();
          m_disconnectPromise.reset();
      }
      return -200;
  }
  
ca0cf29e   Steven   syntax fixes, con...
352
  std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
353
  {
0c424e03   Steven   syntax fixes. WIP
354
      if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
355
      {
ca0cf29e   Steven   syntax fixes, con...
356
          LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
357
358
          return -1;
      }
0c424e03   Steven   syntax fixes. WIP
359
      else if( ConnectionStatus::Disconnected == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
360
      {
ca0cf29e   Steven   syntax fixes, con...
361
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
2b967ea7   Steven   promise is not wo...
362
          connect( true );
51becbde   Peter M. Groen   Committed the ent...
363
364
      }
  
0c424e03   Steven   syntax fixes. WIP
365
      if( !isValidTopic(message.topic() ) )
51becbde   Peter M. Groen   Committed the ent...
366
      {
ca0cf29e   Steven   syntax fixes, con...
367
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
51becbde   Peter M. Groen   Committed the ent...
368
369
      }
  
0c424e03   Steven   syntax fixes. WIP
370
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
371
372
373
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
374
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
375
376
377
378
      {
          qos = 0;
      }
  
51becbde   Peter M. Groen   Committed the ent...
379
      std::unique_lock<std::mutex> lck(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
380
      if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes )
a670240b   Peter M. Groen   Fix on connection
381
      {
51becbde   Peter M. Groen   Committed the ent...
382
          m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
ca0cf29e   Steven   syntax fixes, con...
383
          if( ConnectionStatus::ReconnectInProgress == m_connectionStatus )
a670240b   Peter M. Groen   Fix on connection
384
          {
0c424e03   Steven   syntax fixes. WIP
385
              LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
ca0cf29e   Steven   syntax fixes, con...
386
              m_pendingPublishes.push_front( Publish{ qos, message } );
51becbde   Peter M. Groen   Committed the ent...
387
388
389
390
              return -1;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
391
      return publishInternal( message, qos );
51becbde   Peter M. Groen   Committed the ent...
392
393
394
395
396
397
  }
  
  void ClientPaho::publishPending()
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
398
          if( !m_processPendingPublishes )
2b967ea7   Steven   promise is not wo...
399
          {
51becbde   Peter M. Groen   Committed the ent...
400
401
402
403
              return;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
404
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
405
      {
a670240b   Peter M. Groen   Fix on connection
406
          LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
51becbde   Peter M. Groen   Committed the ent...
407
408
      }
  
0c424e03   Steven   syntax fixes. WIP
409
      while( !m_pendingPublishes.empty() )
51becbde   Peter M. Groen   Committed the ent...
410
411
      {
          const auto& pub = m_pendingPublishes.back();
ca0cf29e   Steven   syntax fixes, con...
412
          publishInternal( pub.data, pub.qos );
51becbde   Peter M. Groen   Committed the ent...
413
414
415
416
417
418
419
420
421
422
423
  
          m_pendingPublishes.pop_back();
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_processPendingPublishes = false;
      }
      m_pendingPublishesReadyCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
424
  std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
51becbde   Peter M. Groen   Committed the ent...
425
  {
0c424e03   Steven   syntax fixes. WIP
426
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
427
      {
11fe0b09   Peter M. Groen   Rework for subscr...
428
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
51becbde   Peter M. Groen   Committed the ent...
429
430
      }
  
0c424e03   Steven   syntax fixes. WIP
431
      if( !isValidTopic( topic ) )
51becbde   Peter M. Groen   Committed the ent...
432
433
      {
          // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
434
435
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
          return -1;
51becbde   Peter M. Groen   Committed the ent...
436
437
      }
  
0c424e03   Steven   syntax fixes. WIP
438
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
439
440
441
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
442
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
443
444
445
446
447
448
449
450
      {
          qos = 0;
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
          auto itExisting = m_subscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
451
452
453
454
          if( m_subscriptions.end() != itExisting )
          {
              if( itExisting->second.qos == qos )
              {
51becbde   Peter M. Groen   Committed the ent...
455
456
457
458
459
460
                  return -1;
              }
              // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
          }
  
          auto itPending = m_pendingSubscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
461
462
463
464
465
466
467
          if( m_pendingSubscriptions.end() != itPending )
          {
              if( itPending->second.qos == qos )
              {
                  auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
                  if( m_subscribeTokenToTopic.end() != itToken )
                  {
51becbde   Peter M. Groen   Committed the ent...
468
469
                      return itToken->first;
                  }
0c424e03   Steven   syntax fixes. WIP
470
471
                  else
                  {
51becbde   Peter M. Groen   Committed the ent...
472
473
474
475
476
477
478
                      return -1;
                  }
              }
              // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
          }
  
          std::string existingTopic{};
0c424e03   Steven   syntax fixes. WIP
479
          if( isOverlappingInternal( topic, existingTopic ) )
51becbde   Peter M. Groen   Committed the ent...
480
481
          {
              // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
482
              LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
483
484
          }
  
ca0cf29e   Steven   syntax fixes, con...
485
486
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
          m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
51becbde   Peter M. Groen   Committed the ent...
487
      }
0c424e03   Steven   syntax fixes. WIP
488
      return subscribeInternal( topic, qos );
51becbde   Peter M. Groen   Committed the ent...
489
490
491
492
  }
  
  void ClientPaho::resubscribe()
  {
ca0cf29e   Steven   syntax fixes, con...
493
      decltype( m_pendingSubscriptions ) pendingSubscriptions{};
51becbde   Peter M. Groen   Committed the ent...
494
495
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
496
          std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
497
498
      }
  
0c424e03   Steven   syntax fixes. WIP
499
      for( const auto& s : pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
500
      {
0c424e03   Steven   syntax fixes. WIP
501
          subscribeInternal( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
502
503
504
      }
  }
  
31eece9b   Steven   added LWT ( last ...
505
  std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
506
507
  {
      {
0c424e03   Steven   syntax fixes. WIP
508
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
509
          bool found = false;
0c424e03   Steven   syntax fixes. WIP
510
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
511
          {
0c424e03   Steven   syntax fixes. WIP
512
              if( topic == s.first && qos == s.second.qos )
51becbde   Peter M. Groen   Committed the ent...
513
514
515
516
517
              {
                  found = true;
                  break;
              }
          }
0c424e03   Steven   syntax fixes. WIP
518
          if( !found )
51becbde   Peter M. Groen   Committed the ent...
519
520
521
522
523
524
525
526
527
528
529
530
531
532
          {
              return -1;
          }
      }
  
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
      opts.onFailure = &ClientPaho::onUnsubscribeFailure;
      opts.context = this;
  
      {
          // Need to lock the mutex because it is possible that the callback is faster than
          // the insertion of the token into the pending operations.
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
533
          auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
0c424e03   Steven   syntax fixes. WIP
534
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
535
          {
ca0cf29e   Steven   syntax fixes, con...
536
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
537
538
          }
  
0c424e03   Steven   syntax fixes. WIP
539
          if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
540
          {
ca0cf29e   Steven   syntax fixes, con...
541
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token )  + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
542
543
          }
  
0c424e03   Steven   syntax fixes. WIP
544
545
          m_operationResult.erase( opts.token );
          if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
546
          {
ca0cf29e   Steven   syntax fixes, con...
547
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic  " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
548
549
550
551
552
553
554
555
556
557
558
559
560
          }
          m_lastUnsubscribe = opts.token; // centos7 workaround
          m_unsubscribeTokenToTopic[opts.token] = topic;
      }
  
      // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
      this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });
  
      return opts.token;
  }
  
  void ClientPaho::unsubscribeAll()
  {
0c424e03   Steven   syntax fixes. WIP
561
      decltype( m_subscriptions ) subscriptions{};
51becbde   Peter M. Groen   Committed the ent...
562
563
564
565
566
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          subscriptions = m_subscriptions;
      }
  
0c424e03   Steven   syntax fixes. WIP
567
568
      for( const auto& s : subscriptions )
      {
ca0cf29e   Steven   syntax fixes, con...
569
          this->unsubscribe( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
570
571
572
573
574
      }
  }
  
  std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
  {
ca0cf29e   Steven   syntax fixes, con...
575
576
577
      if( waitFor <= std::chrono::milliseconds( 0 ) )
      {
          return std::chrono::milliseconds( 0 );
51becbde   Peter M. Groen   Committed the ent...
578
579
580
      }
      std::chrono::milliseconds timeElapsed{};
      {
ca0cf29e   Steven   syntax fixes, con...
581
          osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
51becbde   Peter M. Groen   Committed the ent...
582
          {
ca0cf29e   Steven   syntax fixes, con...
583
              timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
51becbde   Peter M. Groen   Committed the ent...
584
585
          });
          std::unique_lock<std::mutex> lck(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
586
  
51becbde   Peter M. Groen   Committed the ent...
587
588
589
          // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
          m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
          {
ca0cf29e   Steven   syntax fixes, con...
590
              if( tokens.empty() )
51becbde   Peter M. Groen   Committed the ent...
591
592
593
              { // wait for all operations to end
                  return m_pendingOperations.empty();
              }
ca0cf29e   Steven   syntax fixes, con...
594
              else if( tokens.size() == 1 )
51becbde   Peter M. Groen   Committed the ent...
595
              {
ca0cf29e   Steven   syntax fixes, con...
596
                  return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
51becbde   Peter M. Groen   Committed the ent...
597
598
599
600
601
602
603
604
605
              }
              std::vector<std::int32_t> intersect{};
              std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
              return intersect.empty();
          } );
      }
      return timeElapsed;
  }
  
ca0cf29e   Steven   syntax fixes, con...
606
  bool ClientPaho::isOverlapping( const std::string& topic ) const
51becbde   Peter M. Groen   Committed the ent...
607
608
  {
      std::string existingTopic{};
ca0cf29e   Steven   syntax fixes, con...
609
      return isOverlapping( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
610
611
  }
  
ca0cf29e   Steven   syntax fixes, con...
612
  bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
613
614
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
615
      return isOverlappingInternal( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
616
617
618
619
620
621
622
  }
  
  std::vector<std::int32_t> ClientPaho::pendingOperations() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      std::vector<std::int32_t> retval{};
      retval.resize(m_pendingOperations.size());
ca0cf29e   Steven   syntax fixes, con...
623
      std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
51becbde   Peter M. Groen   Committed the ent...
624
625
626
627
628
629
630
631
632
      return retval;
  }
  
  bool ClientPaho::hasPendingSubscriptions() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      return !m_pendingSubscriptions.empty();
  }
  
ca0cf29e   Steven   syntax fixes, con...
633
  boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
51becbde   Peter M. Groen   Committed the ent...
634
635
636
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      boost::optional<bool> ret{};
ca0cf29e   Steven   syntax fixes, con...
637
638
      auto cit = m_operationResult.find( token );
      if( m_operationResult.end() != cit )
51becbde   Peter M. Groen   Committed the ent...
639
640
641
642
643
644
      {
          ret = cit->second;
      }
      return ret;
  }
  
ca0cf29e   Steven   syntax fixes, con...
645
  void ClientPaho::parseEndpoint( const std::string& _endpoint )
51becbde   Peter M. Groen   Committed the ent...
646
  {
ca0cf29e   Steven   syntax fixes, con...
647
648
      auto ep = UriParser::parse( _endpoint );
      if( ep.find( "user" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
649
650
651
652
653
      {
          m_username = ep["user"];
          ep["user"].clear();
      }
  
ca0cf29e   Steven   syntax fixes, con...
654
      if( ep.find( "password" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
655
656
657
658
      {
          m_password = ep["password"];
          ep["password"].clear();
      }
ca0cf29e   Steven   syntax fixes, con...
659
      m_endpoint = UriParser::toString( ep );
51becbde   Peter M. Groen   Committed the ent...
660
661
  }
  
ca0cf29e   Steven   syntax fixes, con...
662
  std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
663
664
665
666
667
668
669
670
671
672
673
674
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onPublishSuccess;
      opts.onFailure = &ClientPaho::onPublishFailure;
      opts.context = this;
      auto msg = message.toAsyncMessage();
      msg.qos = qos;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
  
      // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
675
      auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
ca0cf29e   Steven   syntax fixes, con...
676
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
677
      {
ca0cf29e   Steven   syntax fixes, con...
678
          LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
679
680
      }
  
ca0cf29e   Steven   syntax fixes, con...
681
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
682
      {
ca0cf29e   Steven   syntax fixes, con...
683
          LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
684
      }
ca0cf29e   Steven   syntax fixes, con...
685
      m_operationResult.erase( opts.token );
51becbde   Peter M. Groen   Committed the ent...
686
687
688
      return opts.token;
  }
  
ca0cf29e   Steven   syntax fixes, con...
689
  std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
690
691
692
693
694
695
696
697
698
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onSubscribeSuccess;
      opts.onFailure = &ClientPaho::onSubscribeFailure;
      opts.context = this;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
699
      auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
51becbde   Peter M. Groen   Committed the ent...
700
701
      if (MQTTASYNC_SUCCESS != rc)
      {
ca0cf29e   Steven   syntax fixes, con...
702
703
          m_pendingSubscriptions.erase( topic );
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
704
705
      }
  
ca0cf29e   Steven   syntax fixes, con...
706
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
707
      {
ca0cf29e   Steven   syntax fixes, con...
708
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
709
      }
ca0cf29e   Steven   syntax fixes, con...
710
711
      m_operationResult.erase( opts.token );
      if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
712
      {
ca0cf29e   Steven   syntax fixes, con...
713
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
714
715
716
717
718
      }
      m_subscribeTokenToTopic[opts.token] = topic;
      return opts.token;
  }
  
0c424e03   Steven   syntax fixes. WIP
719
  void ClientPaho::setConnectionStatus( ConnectionStatus status )
51becbde   Peter M. Groen   Committed the ent...
720
721
722
  {
      ConnectionStatus curStatus = m_connectionStatus;
      m_connectionStatus = status;
0c424e03   Steven   syntax fixes. WIP
723
      if( status != curStatus && m_connectionStatusCallback )
51becbde   Peter M. Groen   Committed the ent...
724
      {
0c424e03   Steven   syntax fixes. WIP
725
          m_connectionStatusCallback( m_clientId, status );
51becbde   Peter M. Groen   Committed the ent...
726
727
728
      }
  }
  
0c424e03   Steven   syntax fixes. WIP
729
  bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
730
731
  {
      existingTopic.clear();
0c424e03   Steven   syntax fixes. WIP
732
      for( const auto& s : m_pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
733
      {
0c424e03   Steven   syntax fixes. WIP
734
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
735
736
737
738
739
740
          {
              existingTopic = s.first;
              return true;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
741
      for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
742
      {
ed30280f   Steven   last changes.
743
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
744
745
746
747
748
749
750
751
          {
              existingTopic = s.first;
              return true;
          }
      }
      return false;
  }
  
ed30280f   Steven   last changes.
752
  void ClientPaho::pushIncomingEvent( std::function<void()> ev )
51becbde   Peter M. Groen   Committed the ent...
753
  {
ed30280f   Steven   last changes.
754
      m_callbackEventQueue.push( ev );
51becbde   Peter M. Groen   Committed the ent...
755
756
757
758
  }
  
  void ClientPaho::callbackEventHandler()
  {
ca0cf29e   Steven   syntax fixes, con...
759
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
0c424e03   Steven   syntax fixes. WIP
760
761
      for( ;; )
      {
51becbde   Peter M. Groen   Committed the ent...
762
          std::vector<std::function<void()>> events;
0c424e03   Steven   syntax fixes. WIP
763
          if( !m_callbackEventQueue.pop(events) )
51becbde   Peter M. Groen   Committed the ent...
764
765
766
767
          {
              break;
          }
  
0c424e03   Steven   syntax fixes. WIP
768
          for( const auto& ev : events )
51becbde   Peter M. Groen   Committed the ent...
769
770
          {
              ev();
51becbde   Peter M. Groen   Committed the ent...
771
772
          }
      }
ed30280f   Steven   last changes.
773
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
51becbde   Peter M. Groen   Committed the ent...
774
  }
0c424e03   Steven   syntax fixes. WIP
775
  void ClientPaho::onConnectOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
776
  {
ca0cf29e   Steven   syntax fixes, con...
777
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
778
779
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
780
          std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
781
782
783
784
          m_subscriptions.clear();
          m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
      }
  
ed30280f   Steven   last changes.
785
      setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
786
787
  }
  
2c0c99a5   Peter M. Groen   Fix connect Callb...
788
  void ClientPaho::onConnectSuccessOnInstance()
51becbde   Peter M. Groen   Committed the ent...
789
  {
51becbde   Peter M. Groen   Committed the ent...
790
791
792
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // Register the connect callback that is used in reconnect scenarios.
0c424e03   Steven   syntax fixes. WIP
793
794
          auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
795
          {
0c424e03   Steven   syntax fixes. WIP
796
              LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
51becbde   Peter M. Groen   Committed the ent...
797
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
798
  
51becbde   Peter M. Groen   Committed the ent...
799
800
801
802
803
804
          // For MQTTV5
          //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
          //if (MQTTASYNC_SUCCESS != rc) {
          //    // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
          //}
          // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
2c0c99a5   Peter M. Groen   Fix connect Callb...
805
  
51becbde   Peter M. Groen   Committed the ent...
806
807
808
          m_operationResult[-100] = true;
          m_pendingOperations.erase(-100);
      }
9421324b   Peter M. Groen   First fix on conn...
809
  
0c424e03   Steven   syntax fixes. WIP
810
811
      setConnectionStatus( ConnectionStatus::Connected );
      if( m_connectPromise )
51becbde   Peter M. Groen   Committed the ent...
812
      {
ca0cf29e   Steven   syntax fixes, con...
813
          LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
51becbde   Peter M. Groen   Committed the ent...
814
815
816
817
818
          m_connectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
819
  void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
820
  {
0c424e03   Steven   syntax fixes. WIP
821
822
      (void) response;
      LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
51becbde   Peter M. Groen   Committed the ent...
823
824
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
51becbde   Peter M. Groen   Committed the ent...
825
826
827
828
          // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
ca0cf29e   Steven   syntax fixes, con...
829
      if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
830
      {
ca0cf29e   Steven   syntax fixes, con...
831
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
832
833
834
835
836
837
838
839
840
      }
      m_operationsCompleteCV.notify_all();
  }
  
  //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
  //{
  //    MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
  //}
  
ca0cf29e   Steven   syntax fixes, con...
841
  void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
51becbde   Peter M. Groen   Committed the ent...
842
  {
ca0cf29e   Steven   syntax fixes, con...
843
      LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
51becbde   Peter M. Groen   Committed the ent...
844
845
846
847
848
849
850
851
852
853
854
855
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_subscriptions.clear();
          m_pendingSubscriptions.clear();
          m_subscribeTokenToTopic.clear();
          m_unsubscribeTokenToTopic.clear();
  
          // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
          m_operationResult[-200] = true;
          m_pendingOperations.clear();
      }
  
0c424e03   Steven   syntax fixes. WIP
856
      setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
857
  
0c424e03   Steven   syntax fixes. WIP
858
859
      if( m_disconnectPromise )
      {
51becbde   Peter M. Groen   Committed the ent...
860
861
862
863
864
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
865
  void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
866
  {
0c424e03   Steven   syntax fixes. WIP
867
      (void) response;
ca0cf29e   Steven   syntax fixes, con...
868
      LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
51becbde   Peter M. Groen   Committed the ent...
869
      {
ca0cf29e   Steven   syntax fixes, con...
870
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);        
51becbde   Peter M. Groen   Committed the ent...
871
872
873
874
875
          // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
          m_operationResult[-200] = false;
          m_pendingOperations.erase(-200);
      }
  
0c424e03   Steven   syntax fixes. WIP
876
      if( MQTTAsync_isConnected( m_client ) )
51becbde   Peter M. Groen   Committed the ent...
877
      {
0c424e03   Steven   syntax fixes. WIP
878
          setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
879
880
881
      }
      else
      {
0c424e03   Steven   syntax fixes. WIP
882
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
883
884
      }
  
0c424e03   Steven   syntax fixes. WIP
885
      if( m_disconnectPromise )
51becbde   Peter M. Groen   Committed the ent...
886
887
888
889
890
891
      {
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
892
  void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
893
894
  {
      auto pd = response.publishData();
ca0cf29e   Steven   syntax fixes, con...
895
      LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
896
897
898
899
900
901
902
903
904
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = true;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
905
  void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
906
  {
ca0cf29e   Steven   syntax fixes, con...
907
      LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
908
909
910
911
912
913
914
915
916
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
917
  void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
918
  {
ca0cf29e   Steven   syntax fixes, con...
919
920
921
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
51becbde   Peter M. Groen   Committed the ent...
922
923
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
924
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
925
926
927
      {
          // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
928
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
929
      });
ca0cf29e   Steven   syntax fixes, con...
930
931
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
0c424e03   Steven   syntax fixes. WIP
932
      {
ca0cf29e   Steven   syntax fixes, con...
933
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
934
935
936
937
938
          return;
      }
      auto topic = it->second;
      m_subscribeTokenToTopic.erase(it);
  
ca0cf29e   Steven   syntax fixes, con...
939
940
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
941
      {
ca0cf29e   Steven   syntax fixes, con...
942
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
943
944
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
945
      if( response.qos() != pendingIt->second.qos )
51becbde   Peter M. Groen   Committed the ent...
946
      {
ca0cf29e   Steven   syntax fixes, con...
947
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
51becbde   Peter M. Groen   Committed the ent...
948
      }
ca0cf29e   Steven   syntax fixes, con...
949
950
951
952
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
      m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
953
954
955
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
956
  void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
957
  {
ca0cf29e   Steven   syntax fixes, con...
958
959
960
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
961
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
962
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
963
964
965
      {
          // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
966
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
967
968
      });
  
ca0cf29e   Steven   syntax fixes, con...
969
970
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
971
      {
ca0cf29e   Steven   syntax fixes, con...
972
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
973
974
975
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
976
      m_subscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
977
  
ca0cf29e   Steven   syntax fixes, con...
978
979
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
980
      {
ca0cf29e   Steven   syntax fixes, con...
981
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
982
983
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
984
985
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
986
987
  }
  
ca0cf29e   Steven   syntax fixes, con...
988
  void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
989
  {
ca0cf29e   Steven   syntax fixes, con...
990
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
51becbde   Peter M. Groen   Committed the ent...
991
  
ca0cf29e   Steven   syntax fixes, con...
992
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
993
994
995
996
997
998
999
1000
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
      // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
      // As a workaround the last unsubscribe token is stored and is used when no valid token is available.
      // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
      // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
      // sequentially (see ClientPaho::unsubscribe)!
      auto token = response.token();
ca0cf29e   Steven   syntax fixes, con...
1001
      if( -1 == token )
51becbde   Peter M. Groen   Committed the ent...
1002
1003
1004
1005
1006
1007
      {
          token = m_lastUnsubscribe;
          m_lastUnsubscribe = -1;
      }
  
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
1008
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
1009
1010
1011
      {
          // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
          m_operationResult[token] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
1012
          m_pendingOperations.erase( token );
51becbde   Peter M. Groen   Committed the ent...
1013
1014
      });
  
ca0cf29e   Steven   syntax fixes, con...
1015
1016
      auto it = m_unsubscribeTokenToTopic.find( token );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1017
      {
ca0cf29e   Steven   syntax fixes, con...
1018
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
51becbde   Peter M. Groen   Committed the ent...
1019
1020
1021
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1022
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1023
  
ca0cf29e   Steven   syntax fixes, con...
1024
1025
1026
1027
      auto registeredIt = m_subscriptions.find( topic );
      if( m_subscriptions.end() == registeredIt )
      {
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1028
1029
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
1030
1031
1032
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
      m_subscriptions.erase( registeredIt );
51becbde   Peter M. Groen   Committed the ent...
1033
1034
1035
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1036
  void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
1037
  {
ca0cf29e   Steven   syntax fixes, con...
1038
1039
      LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
1040
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1041
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
1042
1043
1044
      {
          // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
1045
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
1046
1047
      });
  
ca0cf29e   Steven   syntax fixes, con...
1048
1049
      auto it = m_unsubscribeTokenToTopic.find( response.token() );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1050
      {
ca0cf29e   Steven   syntax fixes, con...
1051
          LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1052
1053
1054
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1055
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1056
1057
  }
  
ca0cf29e   Steven   syntax fixes, con...
1058
  int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
51becbde   Peter M. Groen   Committed the ent...
1059
  {
ca0cf29e   Steven   syntax fixes, con...
1060
      LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1061
  
ca0cf29e   Steven   syntax fixes, con...
1062
      std::function<void( MqttMessage )> cb;
51becbde   Peter M. Groen   Committed the ent...
1063
1064
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1065
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
1066
          {
ca0cf29e   Steven   syntax fixes, con...
1067
              if( boost::regex_match( message.topic(), s.second.topicRegex ) )
51becbde   Peter M. Groen   Committed the ent...
1068
1069
1070
1071
1072
1073
              {
                  cb = s.second.callback;
              }
          }
      }
  
ca0cf29e   Steven   syntax fixes, con...
1074
      if( cb )
51becbde   Peter M. Groen   Committed the ent...
1075
      {
ca0cf29e   Steven   syntax fixes, con...
1076
          cb( message );
51becbde   Peter M. Groen   Committed the ent...
1077
1078
1079
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
1080
          LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
1081
1082
1083
1084
      }
      return 1;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1085
  void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1086
  {
ca0cf29e   Steven   syntax fixes, con...
1087
1088
      LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
      if( m_deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
1089
      {
ca0cf29e   Steven   syntax fixes, con...
1090
          m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
51becbde   Peter M. Groen   Committed the ent...
1091
1092
1093
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1094
  void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
1095
  {
ca0cf29e   Steven   syntax fixes, con...
1096
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
1097
      // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
ca0cf29e   Steven   syntax fixes, con...
1098
      setConnectionStatus( ConnectionStatus::ReconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
1099
1100
1101
  
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      // Remove all tokens related to subscriptions from the active operations.
ca0cf29e   Steven   syntax fixes, con...
1102
      for( const auto& p : m_subscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1103
1104
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1105
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1106
1107
      }
  
ca0cf29e   Steven   syntax fixes, con...
1108
      for( const auto& p : m_unsubscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1109
1110
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1111
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1112
1113
1114
1115
1116
1117
1118
      }
      // Clear the administration used in the subscribe process.
      m_subscribeTokenToTopic.clear();
      m_unsubscribeTokenToTopic.clear();
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1119
  void ClientPaho::onFirstConnect( void* context, char* cause )
9421324b   Peter M. Groen   First fix on conn...
1120
1121
  {
      LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1122
      if( context )
9421324b   Peter M. Groen   First fix on conn...
1123
      {
ca0cf29e   Steven   syntax fixes, con...
1124
1125
1126
          auto *cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "Unknown cause" : cause );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
9421324b   Peter M. Groen   First fix on conn...
1127
1128
1129
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1130
  void ClientPaho::onConnect( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1131
  {
9421324b   Peter M. Groen   First fix on conn...
1132
      LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1133
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1134
      {
ca0cf29e   Steven   syntax fixes, con...
1135
1136
1137
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "unknown cause" : cause );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
51becbde   Peter M. Groen   Committed the ent...
1138
1139
1140
1141
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1142
  void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1143
  {
2c0c99a5   Peter M. Groen   Fix connect Callb...
1144
      LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1145
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1146
      {
ca0cf29e   Steven   syntax fixes, con...
1147
1148
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
76d01373   Peter M. Groen   Fix on connection
1149
          {
51becbde   Peter M. Groen   Committed the ent...
1150
              // connect should always have a valid response struct.
ca0cf29e   Steven   syntax fixes, con...
1151
              LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
2c0c99a5   Peter M. Groen   Fix connect Callb...
1152
              return;
51becbde   Peter M. Groen   Committed the ent...
1153
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
1154
          // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
ca0cf29e   Steven   syntax fixes, con...
1155
          cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
51becbde   Peter M. Groen   Committed the ent...
1156
1157
1158
1159
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1160
  void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1161
  {
ca0cf29e   Steven   syntax fixes, con...
1162
1163
      LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1164
      {
ca0cf29e   Steven   syntax fixes, con...
1165
1166
1167
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
      }
  }
  
  //// static
  //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
  //{
  //    apply_unused_parameters(properties);
  //    try {
  //        if (context) {
  //            auto* cl = reinterpret_cast<ClientPaho*>(context);
  //            cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
  //        }
  //    }
  //    catch (...) {
  //    }
  //    catch (const std::exception& e) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
  //    }
  //    catch (...) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
  //    }
  //}
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1192
  void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1193
  {
ca0cf29e   Steven   syntax fixes, con...
1194
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1195
      {
ca0cf29e   Steven   syntax fixes, con...
1196
1197
1198
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : 0 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1199
1200
1201
1202
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1203
  void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1204
  {
9421324b   Peter M. Groen   First fix on conn...
1205
      LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1206
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1207
      {
ca0cf29e   Steven   syntax fixes, con...
1208
1209
1210
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1211
1212
1213
1214
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1215
  void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1216
  {
ca0cf29e   Steven   syntax fixes, con...
1217
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1218
      {
ca0cf29e   Steven   syntax fixes, con...
1219
1220
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
          {
              // publish should always have a valid response struct.
              // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
          }
          MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
          cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1231
  void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1232
  {
ca0cf29e   Steven   syntax fixes, con...
1233
1234
      (void) response;
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1235
      {
ca0cf29e   Steven   syntax fixes, con...
1236
1237
1238
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1239
1240
1241
1242
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1243
  void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1244
  {
ca0cf29e   Steven   syntax fixes, con...
1245
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1246
      {
ca0cf29e   Steven   syntax fixes, con...
1247
1248
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1249
1250
1251
1252
          {
              // subscribe should always have a valid response struct.
              // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
          }
ca0cf29e   Steven   syntax fixes, con...
1253
1254
          MqttSuccess resp( response->token, response->alt.qos );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1255
1256
1257
1258
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1259
  void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1260
  {
ca0cf29e   Steven   syntax fixes, con...
1261
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1262
      {
ca0cf29e   Steven   syntax fixes, con...
1263
1264
1265
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1266
1267
1268
1269
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1270
  void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1271
  {
ca0cf29e   Steven   syntax fixes, con...
1272
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1273
      {
ca0cf29e   Steven   syntax fixes, con...
1274
1275
1276
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : -1 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1277
1278
1279
1280
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1281
  void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1282
  {
ca0cf29e   Steven   syntax fixes, con...
1283
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1284
      {
ca0cf29e   Steven   syntax fixes, con...
1285
1286
1287
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1288
1289
1290
1291
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1292
  int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
51becbde   Peter M. Groen   Committed the ent...
1293
1294
  {
  
ca0cf29e   Steven   syntax fixes, con...
1295
      OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
51becbde   Peter M. Groen   Committed the ent...
1296
      {
ca0cf29e   Steven   syntax fixes, con...
1297
1298
          MQTTAsync_freeMessage( &message );
          MQTTAsync_free( topicName );
51becbde   Peter M. Groen   Committed the ent...
1299
1300
      });
  
ca0cf29e   Steven   syntax fixes, con...
1301
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1302
      {
ca0cf29e   Steven   syntax fixes, con...
1303
1304
1305
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttMessage msg( topicName, *message );
          cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1306
1307
1308
1309
1310
1311
      }
  
      return 1; // always return true. Otherwise this callback is triggered again.
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1312
  void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1313
  {
ca0cf29e   Steven   syntax fixes, con...
1314
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1315
      {
ca0cf29e   Steven   syntax fixes, con...
1316
1317
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
51becbde   Peter M. Groen   Committed the ent...
1318
1319
1320
1321
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1322
  void ClientPaho::onConnectionLost( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1323
  {
ca0cf29e   Steven   syntax fixes, con...
1324
      OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
51becbde   Peter M. Groen   Committed the ent...
1325
      {
ca0cf29e   Steven   syntax fixes, con...
1326
          if( cause )
51becbde   Peter M. Groen   Committed the ent...
1327
          {
ca0cf29e   Steven   syntax fixes, con...
1328
              MQTTAsync_free( cause );
51becbde   Peter M. Groen   Committed the ent...
1329
1330
1331
          }
      });
  
ca0cf29e   Steven   syntax fixes, con...
1332
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1333
      {
ca0cf29e   Steven   syntax fixes, con...
1334
1335
1336
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string msg( nullptr == cause ? "cause unknown" : cause );
          cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1337
1338
1339
1340
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1341
  void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
51becbde   Peter M. Groen   Committed the ent...
1342
  {
ca0cf29e   Steven   syntax fixes, con...
1343
1344
      (void) message;
      switch( level )
51becbde   Peter M. Groen   Committed the ent...
1345
1346
1347
      {
          case MQTTASYNC_TRACE_MAXIMUM:
          case MQTTASYNC_TRACE_MEDIUM:
ca0cf29e   Steven   syntax fixes, con...
1348
1349
          case MQTTASYNC_TRACE_MINIMUM:
          {
51becbde   Peter M. Groen   Committed the ent...
1350
1351
1352
              // ("ClientPaho", "paho - %1", message)
              break;
          }
ca0cf29e   Steven   syntax fixes, con...
1353
1354
          case MQTTASYNC_TRACE_PROTOCOL:
          {
51becbde   Peter M. Groen   Committed the ent...
1355
1356
1357
1358
1359
              // ("ClientPaho", "paho - %1", message)
              break;
          }
          case MQTTASYNC_TRACE_ERROR:
          case MQTTASYNC_TRACE_SEVERE:
ca0cf29e   Steven   syntax fixes, con...
1360
1361
          case MQTTASYNC_TRACE_FATAL:
          {
51becbde   Peter M. Groen   Committed the ent...
1362
1363
1364
1365
1366
              // ("ClientPaho", "paho - %1", message)
              break;
          }
      }
  }