Blame view

src/clientpaho.cpp 49.5 KB
b5d9e433   Peter M. Groen   Fixed License Hea...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  /* ****************************************************************************
   * Copyright 2019 Open Systems Development BV                                 *
   *                                                                            *
   * Permission is hereby granted, free of charge, to any person obtaining a    *
   * copy of this software and associated documentation files (the "Software"), *
   * to deal in the Software without restriction, including without limitation  *
   * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
   * and/or sell copies of the Software, and to permit persons to whom the      *
   * Software is furnished to do so, subject to the following conditions:       *
   *                                                                            *
   * The above copyright notice and this permission notice shall be included in *
   * all copies or substantial portions of the Software.                        *
   *                                                                            *
   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
   * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
   * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
   * DEALINGS IN THE SOFTWARE.                                                  *
   * ***************************************************************************/
51becbde   Peter M. Groen   Committed the ent...
22
23
24
25
26
  #include "clientpaho.h"
  
  #include "errorcode.h"
  #include "mqttutil.h"
  #include "lockguard.h"
9421324b   Peter M. Groen   First fix on conn...
27
  #include "log.h"
51becbde   Peter M. Groen   Committed the ent...
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
  #include "metaprogrammingdefs.h"
  #include "mqttstream.h"
  #include "scopeguard.h"
  #include "uriparser.h"
  
  // std::chrono
  #include "compat-chrono.h"
  
  // std
  #include <algorithm>
  #include <iterator>
  
  using namespace osdev::components::mqtt;
  
  namespace {
  
  #if defined(__clang__)
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wunused-template"
  #endif
  
  OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)
  
  template <typename TRet>
  inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
      return MQTTAsync_disconnectOptions_initializer;
  }
  
  template <typename TRet>
  inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
  // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
  // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
  // method should not be chosen when the struct does not contain member onSuccess5!
  // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
  // while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
  // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
  // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
  #ifndef MQTTAsync_disconnectOptions_initializer5
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
      TRet ret = MQTTAsync_disconnectOptions_initializer;
      ret.struct_version = 1;
      ret.onSuccess5 = nullptr;
      ret.onFailure5 = nullptr;
      return ret;
  #pragma GCC diagnostic pop
  #else
      return MQTTAsync_disconnectOptions_initializer5;
  #endif
  }
  
  template <typename TRet>
  struct Init
  {
      static TRet initialize()
      {
          return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
      }
  };
  #if defined(__clang__)
  #pragma GCC diagnostic pop
  #endif
  
  } // namespace
  
  std::atomic_int ClientPaho::s_numberOfInstances(0);
  
ca0cf29e   Steven   syntax fixes, con...
97
  ClientPaho::ClientPaho( const std::string& _endpoint,
51becbde   Peter M. Groen   Committed the ent...
98
      const std::string& _id,
ca0cf29e   Steven   syntax fixes, con...
99
100
      const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
      const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
      : m_mutex()
      , m_endpoint()
      , m_username()
      , m_password()
      , m_clientId(_id)
      , m_pendingOperations()
      , m_operationResult()
      , m_operationsCompleteCV()
      , m_subscriptions()
      , m_pendingSubscriptions()
      , m_subscribeTokenToTopic()
      , m_unsubscribeTokenToTopic()
      , m_pendingPublishes()
      , m_processPendingPublishes(false)
      , m_pendingPublishesReadyCV()
      , m_client()
      , m_connectionStatus(ConnectionStatus::Disconnected)
      , m_connectionStatusCallback(connectionStatusCallback)
      , m_deliveryCompleteCallback(deliveryCompleteCallback)
      , m_lastUnsubscribe(-1)
      , m_connectPromise()
      , m_disconnectPromise()
      , m_callbackEventQueue(m_clientId)
      , m_workerThread()
  {
ca0cf29e   Steven   syntax fixes, con...
126
      if( 0 == s_numberOfInstances++ )
76d01373   Peter M. Groen   Fix on connection
127
      {
ca0cf29e   Steven   syntax fixes, con...
128
          MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
51becbde   Peter M. Groen   Committed the ent...
129
      }
76d01373   Peter M. Groen   Fix on connection
130
  
ca0cf29e   Steven   syntax fixes, con...
131
      LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
51becbde   Peter M. Groen   Committed the ent...
132
      parseEndpoint(_endpoint);
76d01373   Peter M. Groen   Fix on connection
133
  
ca0cf29e   Steven   syntax fixes, con...
134
135
      auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
      if( MQTTASYNC_SUCCESS == rc )
51becbde   Peter M. Groen   Committed the ent...
136
      {
ca0cf29e   Steven   syntax fixes, con...
137
138
          MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
          m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
51becbde   Peter M. Groen   Committed the ent...
139
140
141
      }
      else
      {
76d01373   Peter M. Groen   Fix on connection
142
          LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
143
144
145
146
147
      }
  }
  
  ClientPaho::~ClientPaho()
  {
ca0cf29e   Steven   syntax fixes, con...
148
      LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
51becbde   Peter M. Groen   Committed the ent...
149
150
151
152
      if( MQTTAsync_isConnected( m_client ) )
      {
          this->unsubscribeAll();
  
ca0cf29e   Steven   syntax fixes, con...
153
154
          this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
          this->disconnect( true, 5000 );
51becbde   Peter M. Groen   Committed the ent...
155
156
157
158
      }
      else
      {
          // If the status was already disconnected this call does nothing
ca0cf29e   Steven   syntax fixes, con...
159
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
160
161
      }
  
ca0cf29e   Steven   syntax fixes, con...
162
      if( 0 == --s_numberOfInstances )
51becbde   Peter M. Groen   Committed the ent...
163
164
165
166
167
      {
          // encountered a case where termination of the logging system within paho led to a segfault.
          // This was a paho thread that was cleaned while at the same time the logging system was terminated.
          // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
          // frequently.
ca0cf29e   Steven   syntax fixes, con...
168
          MQTTAsync_setTraceCallback( nullptr );
51becbde   Peter M. Groen   Committed the ent...
169
170
      }
  
ca0cf29e   Steven   syntax fixes, con...
171
      MQTTAsync_destroy( &m_client );
51becbde   Peter M. Groen   Committed the ent...
172
173
  
      m_callbackEventQueue.stop();
ca0cf29e   Steven   syntax fixes, con...
174
      if( m_workerThread.joinable() )
51becbde   Peter M. Groen   Committed the ent...
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
      {
          m_workerThread.join();
      }
  }
  
  std::string ClientPaho::clientId() const
  {
      return m_clientId;
  }
  
  ConnectionStatus ClientPaho::connectionStatus() const
  {
      return m_connectionStatus;
  }
  
31eece9b   Steven   added LWT ( last ...
190
  std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
51becbde   Peter M. Groen   Committed the ent...
191
192
193
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
2b967ea7   Steven   promise is not wo...
194
          if( ConnectionStatus::Disconnected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
195
196
197
          {
              return -1;
          }
2b967ea7   Steven   promise is not wo...
198
          setConnectionStatus( ConnectionStatus::ConnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
199
200
      }
  
76d01373   Peter M. Groen   Fix on connection
201
202
      LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
  
51becbde   Peter M. Groen   Committed the ent...
203
      MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
9421324b   Peter M. Groen   First fix on conn...
204
      conn_opts.keepAliveInterval = 5;
51becbde   Peter M. Groen   Committed the ent...
205
      conn_opts.cleansession = 1;
2c0c99a5   Peter M. Groen   Fix connect Callb...
206
      conn_opts.onSuccess = nullptr;
51becbde   Peter M. Groen   Committed the ent...
207
208
209
      conn_opts.onFailure = &ClientPaho::onConnectFailure;
      conn_opts.context = this;
      conn_opts.automaticReconnect = 1;
31eece9b   Steven   added LWT ( last ...
210
  
76d01373   Peter M. Groen   Fix on connection
211
212
213
214
      // Make sure we get a signal if the promise is fulfilled
      auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
      if( MQTTASYNC_SUCCESS == ccb )
      {
ca0cf29e   Steven   syntax fixes, con...
215
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
76d01373   Peter M. Groen   Fix on connection
216
217
218
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
219
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
76d01373   Peter M. Groen   Fix on connection
220
221
222
      }
  
      // Setup the last will and testament, if so desired.
31eece9b   Steven   added LWT ( last ...
223
224
225
226
227
228
229
      if( !lwt.topic().empty() )
      {
          MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
          will_opts.message = lwt.message().c_str();
          will_opts.topicName = lwt.topic().c_str();
  
          conn_opts.will = &will_opts;
76d01373   Peter M. Groen   Fix on connection
230
231
  
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
31eece9b   Steven   added LWT ( last ...
232
233
234
235
236
237
      }
      else
      {
          conn_opts.will = nullptr;
      }
  
2b967ea7   Steven   promise is not wo...
238
      if( !m_username.empty() )
51becbde   Peter M. Groen   Committed the ent...
239
240
241
242
      {
          conn_opts.username = m_username.c_str();
      }
  
2b967ea7   Steven   promise is not wo...
243
      if( !m_password.empty() )
51becbde   Peter M. Groen   Committed the ent...
244
245
246
247
248
249
250
      {
          conn_opts.password = m_password.c_str();
      }
  
      std::promise<void> waitForConnectPromise{};
      auto waitForConnect = waitForConnectPromise.get_future();
      m_connectPromise.reset();
2b967ea7   Steven   promise is not wo...
251
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
252
      {
2b967ea7   Steven   promise is not wo...
253
          m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
51becbde   Peter M. Groen   Committed the ent...
254
255
256
      }
  
      {
2b967ea7   Steven   promise is not wo...
257
258
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
          if( !m_pendingOperations.insert( -100 ).second )
51becbde   Peter M. Groen   Committed the ent...
259
260
261
          {
              // Write something
          }
2b967ea7   Steven   promise is not wo...
262
          m_operationResult.erase( -100 );
51becbde   Peter M. Groen   Committed the ent...
263
264
      }
  
2b967ea7   Steven   promise is not wo...
265
266
      int rc = MQTTAsync_connect( m_client, &conn_opts );
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
267
      {
2b967ea7   Steven   promise is not wo...
268
269
          setConnectionStatus( ConnectionStatus::Disconnected );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
270
271
272
273
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
  
2b967ea7   Steven   promise is not wo...
274
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
275
276
277
278
279
280
281
      {
          waitForConnect.get();
          m_connectPromise.reset();
      }
      return -100;
  }
  
0c424e03   Steven   syntax fixes. WIP
282
  std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
51becbde   Peter M. Groen   Committed the ent...
283
284
285
286
287
  {
      ConnectionStatus currentStatus = m_connectionStatus;
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
288
289
          if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
          {
51becbde   Peter M. Groen   Committed the ent...
290
291
292
293
              return -1;
          }
  
          currentStatus = m_connectionStatus;
0c424e03   Steven   syntax fixes. WIP
294
          setConnectionStatus( ConnectionStatus::DisconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
295
296
297
298
299
300
301
302
303
304
305
      }
  
      MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
      disconn_opts.timeout = timeoutMs;
      disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
      disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
      disconn_opts.context = this;
  
      std::promise<void> waitForDisconnectPromise{};
      auto waitForDisconnect = waitForDisconnectPromise.get_future();
      m_disconnectPromise.reset();
0c424e03   Steven   syntax fixes. WIP
306
307
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
308
309
310
311
312
          m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
313
          if( !m_pendingOperations.insert( -200 ).second )
51becbde   Peter M. Groen   Committed the ent...
314
          {
ca0cf29e   Steven   syntax fixes, con...
315
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
316
          }
ca0cf29e   Steven   syntax fixes, con...
317
          m_operationResult.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
318
319
      }
  
ca0cf29e   Steven   syntax fixes, con...
320
      int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
0c424e03   Steven   syntax fixes. WIP
321
      if( MQTTASYNC_SUCCESS != rc )
76d01373   Peter M. Groen   Fix on connection
322
      {
0c424e03   Steven   syntax fixes. WIP
323
          if( MQTTASYNC_DISCONNECTED == rc )
76d01373   Peter M. Groen   Fix on connection
324
          {
51becbde   Peter M. Groen   Committed the ent...
325
326
327
              currentStatus = ConnectionStatus::Disconnected;
          }
  
0c424e03   Steven   syntax fixes. WIP
328
329
          setConnectionStatus( currentStatus );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
330
          m_operationResult[-200] = false;
ca0cf29e   Steven   syntax fixes, con...
331
          m_pendingOperations.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
332
  
0c424e03   Steven   syntax fixes. WIP
333
          if( MQTTASYNC_DISCONNECTED == rc )
2b967ea7   Steven   promise is not wo...
334
          {
51becbde   Peter M. Groen   Committed the ent...
335
336
              return -1;
          }
ca0cf29e   Steven   syntax fixes, con...
337
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
338
339
      }
  
2b967ea7   Steven   promise is not wo...
340
341
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
342
343
          if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
          {
ca0cf29e   Steven   syntax fixes, con...
344
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
51becbde   Peter M. Groen   Committed the ent...
345
346
347
348
349
350
351
          }
          waitForDisconnect.get();
          m_disconnectPromise.reset();
      }
      return -200;
  }
  
ca0cf29e   Steven   syntax fixes, con...
352
  std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
353
  {
0c424e03   Steven   syntax fixes. WIP
354
      if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
355
      {
ca0cf29e   Steven   syntax fixes, con...
356
          LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
357
358
          return -1;
      }
0c424e03   Steven   syntax fixes. WIP
359
      else if( ConnectionStatus::Disconnected == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
360
      {
ca0cf29e   Steven   syntax fixes, con...
361
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
2b967ea7   Steven   promise is not wo...
362
          connect( true );
51becbde   Peter M. Groen   Committed the ent...
363
364
      }
  
0c424e03   Steven   syntax fixes. WIP
365
      if( !isValidTopic(message.topic() ) )
51becbde   Peter M. Groen   Committed the ent...
366
      {
ca0cf29e   Steven   syntax fixes, con...
367
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
51becbde   Peter M. Groen   Committed the ent...
368
369
      }
  
0c424e03   Steven   syntax fixes. WIP
370
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
371
372
373
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
374
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
375
376
377
378
      {
          qos = 0;
      }
  
51becbde   Peter M. Groen   Committed the ent...
379
      std::unique_lock<std::mutex> lck(m_mutex);
e8f6540b   Steven   also push to queu...
380
      if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || ConnectionStatus::ConnectInProgress == m_connectionStatus || m_processPendingPublishes )
a670240b   Peter M. Groen   Fix on connection
381
      {
51becbde   Peter M. Groen   Committed the ent...
382
          m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
e8f6540b   Steven   also push to queu...
383
          if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || ConnectionStatus::ConnectInProgress == m_connectionStatus )
a670240b   Peter M. Groen   Fix on connection
384
          {
0c424e03   Steven   syntax fixes. WIP
385
              LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
ca0cf29e   Steven   syntax fixes, con...
386
              m_pendingPublishes.push_front( Publish{ qos, message } );
51becbde   Peter M. Groen   Committed the ent...
387
388
389
390
              return -1;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
391
      return publishInternal( message, qos );
51becbde   Peter M. Groen   Committed the ent...
392
393
394
395
396
397
  }
  
  void ClientPaho::publishPending()
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
398
          if( !m_processPendingPublishes )
2b967ea7   Steven   promise is not wo...
399
          {
51becbde   Peter M. Groen   Committed the ent...
400
401
402
403
              return;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
404
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
405
      {
a670240b   Peter M. Groen   Fix on connection
406
          LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
51becbde   Peter M. Groen   Committed the ent...
407
408
      }
  
0c424e03   Steven   syntax fixes. WIP
409
      while( !m_pendingPublishes.empty() )
51becbde   Peter M. Groen   Committed the ent...
410
411
      {
          const auto& pub = m_pendingPublishes.back();
ca0cf29e   Steven   syntax fixes, con...
412
          publishInternal( pub.data, pub.qos );
51becbde   Peter M. Groen   Committed the ent...
413
414
415
416
417
418
419
420
421
422
423
  
          m_pendingPublishes.pop_back();
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_processPendingPublishes = false;
      }
      m_pendingPublishesReadyCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
424
  std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
51becbde   Peter M. Groen   Committed the ent...
425
  {
0c424e03   Steven   syntax fixes. WIP
426
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
427
      {
11fe0b09   Peter M. Groen   Rework for subscr...
428
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
51becbde   Peter M. Groen   Committed the ent...
429
430
      }
  
0c424e03   Steven   syntax fixes. WIP
431
      if( !isValidTopic( topic ) )
51becbde   Peter M. Groen   Committed the ent...
432
433
      {
          // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
434
435
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
          return -1;
51becbde   Peter M. Groen   Committed the ent...
436
437
      }
  
0c424e03   Steven   syntax fixes. WIP
438
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
439
440
441
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
442
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
443
444
445
446
447
448
449
450
      {
          qos = 0;
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
          auto itExisting = m_subscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
451
452
453
454
          if( m_subscriptions.end() != itExisting )
          {
              if( itExisting->second.qos == qos )
              {
51becbde   Peter M. Groen   Committed the ent...
455
456
457
458
459
460
                  return -1;
              }
              // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
          }
  
          auto itPending = m_pendingSubscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
461
462
463
464
465
466
467
          if( m_pendingSubscriptions.end() != itPending )
          {
              if( itPending->second.qos == qos )
              {
                  auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
                  if( m_subscribeTokenToTopic.end() != itToken )
                  {
51becbde   Peter M. Groen   Committed the ent...
468
469
                      return itToken->first;
                  }
0c424e03   Steven   syntax fixes. WIP
470
471
                  else
                  {
51becbde   Peter M. Groen   Committed the ent...
472
473
474
475
476
477
478
                      return -1;
                  }
              }
              // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
          }
  
          std::string existingTopic{};
0c424e03   Steven   syntax fixes. WIP
479
          if( isOverlappingInternal( topic, existingTopic ) )
51becbde   Peter M. Groen   Committed the ent...
480
481
          {
              // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
482
              LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
483
484
          }
  
ca0cf29e   Steven   syntax fixes, con...
485
486
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
          m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
51becbde   Peter M. Groen   Committed the ent...
487
      }
0c424e03   Steven   syntax fixes. WIP
488
      return subscribeInternal( topic, qos );
51becbde   Peter M. Groen   Committed the ent...
489
490
491
492
  }
  
  void ClientPaho::resubscribe()
  {
ca0cf29e   Steven   syntax fixes, con...
493
      decltype( m_pendingSubscriptions ) pendingSubscriptions{};
51becbde   Peter M. Groen   Committed the ent...
494
495
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
496
          std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
497
498
      }
  
0c424e03   Steven   syntax fixes. WIP
499
      for( const auto& s : pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
500
      {
0c424e03   Steven   syntax fixes. WIP
501
          subscribeInternal( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
502
503
504
      }
  }
  
31eece9b   Steven   added LWT ( last ...
505
  std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
506
507
  {
      {
0c424e03   Steven   syntax fixes. WIP
508
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
509
          bool found = false;
0c424e03   Steven   syntax fixes. WIP
510
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
511
          {
0c424e03   Steven   syntax fixes. WIP
512
              if( topic == s.first && qos == s.second.qos )
51becbde   Peter M. Groen   Committed the ent...
513
514
515
516
517
              {
                  found = true;
                  break;
              }
          }
0c424e03   Steven   syntax fixes. WIP
518
          if( !found )
51becbde   Peter M. Groen   Committed the ent...
519
520
521
522
523
524
525
526
527
528
529
530
531
532
          {
              return -1;
          }
      }
  
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
      opts.onFailure = &ClientPaho::onUnsubscribeFailure;
      opts.context = this;
  
      {
          // Need to lock the mutex because it is possible that the callback is faster than
          // the insertion of the token into the pending operations.
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
533
          auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
0c424e03   Steven   syntax fixes. WIP
534
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
535
          {
ca0cf29e   Steven   syntax fixes, con...
536
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
537
538
          }
  
0c424e03   Steven   syntax fixes. WIP
539
          if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
540
          {
ca0cf29e   Steven   syntax fixes, con...
541
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token )  + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
542
543
          }
  
0c424e03   Steven   syntax fixes. WIP
544
545
          m_operationResult.erase( opts.token );
          if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
546
          {
ca0cf29e   Steven   syntax fixes, con...
547
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic  " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
548
549
550
551
552
553
554
555
556
557
558
559
560
          }
          m_lastUnsubscribe = opts.token; // centos7 workaround
          m_unsubscribeTokenToTopic[opts.token] = topic;
      }
  
      // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
      this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });
  
      return opts.token;
  }
  
  void ClientPaho::unsubscribeAll()
  {
0c424e03   Steven   syntax fixes. WIP
561
      decltype( m_subscriptions ) subscriptions{};
51becbde   Peter M. Groen   Committed the ent...
562
563
564
565
566
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          subscriptions = m_subscriptions;
      }
  
0c424e03   Steven   syntax fixes. WIP
567
568
      for( const auto& s : subscriptions )
      {
ca0cf29e   Steven   syntax fixes, con...
569
          this->unsubscribe( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
570
571
572
573
574
      }
  }
  
  std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
  {
ca0cf29e   Steven   syntax fixes, con...
575
576
577
      if( waitFor <= std::chrono::milliseconds( 0 ) )
      {
          return std::chrono::milliseconds( 0 );
51becbde   Peter M. Groen   Committed the ent...
578
579
580
      }
      std::chrono::milliseconds timeElapsed{};
      {
ca0cf29e   Steven   syntax fixes, con...
581
          osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
51becbde   Peter M. Groen   Committed the ent...
582
          {
ca0cf29e   Steven   syntax fixes, con...
583
              timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
51becbde   Peter M. Groen   Committed the ent...
584
585
          });
          std::unique_lock<std::mutex> lck(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
586
  
51becbde   Peter M. Groen   Committed the ent...
587
588
589
          // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
          m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
          {
ca0cf29e   Steven   syntax fixes, con...
590
              if( tokens.empty() )
51becbde   Peter M. Groen   Committed the ent...
591
592
593
              { // wait for all operations to end
                  return m_pendingOperations.empty();
              }
ca0cf29e   Steven   syntax fixes, con...
594
              else if( tokens.size() == 1 )
51becbde   Peter M. Groen   Committed the ent...
595
              {
ca0cf29e   Steven   syntax fixes, con...
596
                  return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
51becbde   Peter M. Groen   Committed the ent...
597
598
599
600
601
602
603
604
605
              }
              std::vector<std::int32_t> intersect{};
              std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
              return intersect.empty();
          } );
      }
      return timeElapsed;
  }
  
ca0cf29e   Steven   syntax fixes, con...
606
  bool ClientPaho::isOverlapping( const std::string& topic ) const
51becbde   Peter M. Groen   Committed the ent...
607
608
  {
      std::string existingTopic{};
ca0cf29e   Steven   syntax fixes, con...
609
      return isOverlapping( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
610
611
  }
  
ca0cf29e   Steven   syntax fixes, con...
612
  bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
613
614
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
615
      return isOverlappingInternal( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
616
617
618
619
620
621
622
  }
  
  std::vector<std::int32_t> ClientPaho::pendingOperations() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      std::vector<std::int32_t> retval{};
      retval.resize(m_pendingOperations.size());
ca0cf29e   Steven   syntax fixes, con...
623
      std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
51becbde   Peter M. Groen   Committed the ent...
624
625
626
627
628
629
630
631
632
      return retval;
  }
  
  bool ClientPaho::hasPendingSubscriptions() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      return !m_pendingSubscriptions.empty();
  }
  
ca0cf29e   Steven   syntax fixes, con...
633
  boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
51becbde   Peter M. Groen   Committed the ent...
634
635
636
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      boost::optional<bool> ret{};
ca0cf29e   Steven   syntax fixes, con...
637
638
      auto cit = m_operationResult.find( token );
      if( m_operationResult.end() != cit )
51becbde   Peter M. Groen   Committed the ent...
639
640
641
642
643
644
      {
          ret = cit->second;
      }
      return ret;
  }
  
ca0cf29e   Steven   syntax fixes, con...
645
  void ClientPaho::parseEndpoint( const std::string& _endpoint )
51becbde   Peter M. Groen   Committed the ent...
646
  {
ca0cf29e   Steven   syntax fixes, con...
647
648
      auto ep = UriParser::parse( _endpoint );
      if( ep.find( "user" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
649
650
651
652
653
      {
          m_username = ep["user"];
          ep["user"].clear();
      }
  
ca0cf29e   Steven   syntax fixes, con...
654
      if( ep.find( "password" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
655
656
657
658
      {
          m_password = ep["password"];
          ep["password"].clear();
      }
ca0cf29e   Steven   syntax fixes, con...
659
      m_endpoint = UriParser::toString( ep );
51becbde   Peter M. Groen   Committed the ent...
660
661
  }
  
ca0cf29e   Steven   syntax fixes, con...
662
  std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
663
664
665
666
667
668
669
670
671
672
673
674
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onPublishSuccess;
      opts.onFailure = &ClientPaho::onPublishFailure;
      opts.context = this;
      auto msg = message.toAsyncMessage();
      msg.qos = qos;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
  
      // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
675
      auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
ca0cf29e   Steven   syntax fixes, con...
676
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
677
      {
ca0cf29e   Steven   syntax fixes, con...
678
          LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
679
680
      }
  
ca0cf29e   Steven   syntax fixes, con...
681
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
682
      {
d557d523   Peter M. Groen   Fix deferred subs...
683
          // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
684
      }
ca0cf29e   Steven   syntax fixes, con...
685
      m_operationResult.erase( opts.token );
51becbde   Peter M. Groen   Committed the ent...
686
687
688
      return opts.token;
  }
  
ca0cf29e   Steven   syntax fixes, con...
689
  std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
690
691
692
693
694
695
696
697
698
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onSubscribeSuccess;
      opts.onFailure = &ClientPaho::onSubscribeFailure;
      opts.context = this;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
699
      auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
51becbde   Peter M. Groen   Committed the ent...
700
701
      if (MQTTASYNC_SUCCESS != rc)
      {
ca0cf29e   Steven   syntax fixes, con...
702
703
          m_pendingSubscriptions.erase( topic );
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
704
705
      }
  
ca0cf29e   Steven   syntax fixes, con...
706
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
707
      {
ca0cf29e   Steven   syntax fixes, con...
708
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
709
      }
ca0cf29e   Steven   syntax fixes, con...
710
711
      m_operationResult.erase( opts.token );
      if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
712
      {
ca0cf29e   Steven   syntax fixes, con...
713
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
714
715
716
717
718
      }
      m_subscribeTokenToTopic[opts.token] = topic;
      return opts.token;
  }
  
0c424e03   Steven   syntax fixes. WIP
719
  void ClientPaho::setConnectionStatus( ConnectionStatus status )
51becbde   Peter M. Groen   Committed the ent...
720
  {
d557d523   Peter M. Groen   Fix deferred subs...
721
      LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) );
51becbde   Peter M. Groen   Committed the ent...
722
723
      ConnectionStatus curStatus = m_connectionStatus;
      m_connectionStatus = status;
0c424e03   Steven   syntax fixes. WIP
724
      if( status != curStatus && m_connectionStatusCallback )
51becbde   Peter M. Groen   Committed the ent...
725
      {
d557d523   Peter M. Groen   Fix deferred subs...
726
          LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) );
0c424e03   Steven   syntax fixes. WIP
727
          m_connectionStatusCallback( m_clientId, status );
51becbde   Peter M. Groen   Committed the ent...
728
729
730
      }
  }
  
0c424e03   Steven   syntax fixes. WIP
731
  bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
732
733
  {
      existingTopic.clear();
0c424e03   Steven   syntax fixes. WIP
734
      for( const auto& s : m_pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
735
      {
0c424e03   Steven   syntax fixes. WIP
736
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
737
738
739
740
741
742
          {
              existingTopic = s.first;
              return true;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
743
      for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
744
      {
ed30280f   Steven   last changes.
745
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
746
747
748
749
750
751
752
753
          {
              existingTopic = s.first;
              return true;
          }
      }
      return false;
  }
  
ed30280f   Steven   last changes.
754
  void ClientPaho::pushIncomingEvent( std::function<void()> ev )
51becbde   Peter M. Groen   Committed the ent...
755
  {
ed30280f   Steven   last changes.
756
      m_callbackEventQueue.push( ev );
51becbde   Peter M. Groen   Committed the ent...
757
758
759
760
  }
  
  void ClientPaho::callbackEventHandler()
  {
ca0cf29e   Steven   syntax fixes, con...
761
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
0c424e03   Steven   syntax fixes. WIP
762
763
      for( ;; )
      {
51becbde   Peter M. Groen   Committed the ent...
764
          std::vector<std::function<void()>> events;
0c424e03   Steven   syntax fixes. WIP
765
          if( !m_callbackEventQueue.pop(events) )
51becbde   Peter M. Groen   Committed the ent...
766
767
768
769
          {
              break;
          }
  
0c424e03   Steven   syntax fixes. WIP
770
          for( const auto& ev : events )
51becbde   Peter M. Groen   Committed the ent...
771
772
          {
              ev();
51becbde   Peter M. Groen   Committed the ent...
773
774
          }
      }
ed30280f   Steven   last changes.
775
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
51becbde   Peter M. Groen   Committed the ent...
776
  }
0c424e03   Steven   syntax fixes. WIP
777
  void ClientPaho::onConnectOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
778
  {
ca0cf29e   Steven   syntax fixes, con...
779
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
780
781
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
782
          std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
783
784
785
786
          m_subscriptions.clear();
          m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
      }
  
ed30280f   Steven   last changes.
787
      setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
788
789
  }
  
2c0c99a5   Peter M. Groen   Fix connect Callb...
790
  void ClientPaho::onConnectSuccessOnInstance()
51becbde   Peter M. Groen   Committed the ent...
791
  {
d557d523   Peter M. Groen   Fix deferred subs...
792
793
      LogDebug( "[ClientPaho::onConnectSuccessOnInstance]",
                std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) );
51becbde   Peter M. Groen   Committed the ent...
794
795
796
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // Register the connect callback that is used in reconnect scenarios.
0c424e03   Steven   syntax fixes. WIP
797
798
          auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
799
          {
0c424e03   Steven   syntax fixes. WIP
800
              LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
51becbde   Peter M. Groen   Committed the ent...
801
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
802
  
51becbde   Peter M. Groen   Committed the ent...
803
804
805
806
807
808
          // For MQTTV5
          //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
          //if (MQTTASYNC_SUCCESS != rc) {
          //    // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
          //}
          // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
2c0c99a5   Peter M. Groen   Fix connect Callb...
809
  
51becbde   Peter M. Groen   Committed the ent...
810
811
812
          m_operationResult[-100] = true;
          m_pendingOperations.erase(-100);
      }
9421324b   Peter M. Groen   First fix on conn...
813
  
0c424e03   Steven   syntax fixes. WIP
814
815
      setConnectionStatus( ConnectionStatus::Connected );
      if( m_connectPromise )
51becbde   Peter M. Groen   Committed the ent...
816
      {
ca0cf29e   Steven   syntax fixes, con...
817
          LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
51becbde   Peter M. Groen   Committed the ent...
818
819
820
821
822
          m_connectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
823
  void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
824
  {
0c424e03   Steven   syntax fixes. WIP
825
826
      (void) response;
      LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
51becbde   Peter M. Groen   Committed the ent...
827
828
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
51becbde   Peter M. Groen   Committed the ent...
829
830
831
832
          // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
ca0cf29e   Steven   syntax fixes, con...
833
      if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
834
      {
ca0cf29e   Steven   syntax fixes, con...
835
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
836
837
838
839
840
841
842
843
844
      }
      m_operationsCompleteCV.notify_all();
  }
  
  //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
  //{
  //    MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
  //}
  
ca0cf29e   Steven   syntax fixes, con...
845
  void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
51becbde   Peter M. Groen   Committed the ent...
846
  {
ca0cf29e   Steven   syntax fixes, con...
847
      LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
51becbde   Peter M. Groen   Committed the ent...
848
849
850
851
852
853
854
855
856
857
858
859
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_subscriptions.clear();
          m_pendingSubscriptions.clear();
          m_subscribeTokenToTopic.clear();
          m_unsubscribeTokenToTopic.clear();
  
          // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
          m_operationResult[-200] = true;
          m_pendingOperations.clear();
      }
  
0c424e03   Steven   syntax fixes. WIP
860
      setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
861
  
0c424e03   Steven   syntax fixes. WIP
862
863
      if( m_disconnectPromise )
      {
51becbde   Peter M. Groen   Committed the ent...
864
865
866
867
868
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
869
  void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
870
  {
0c424e03   Steven   syntax fixes. WIP
871
      (void) response;
ca0cf29e   Steven   syntax fixes, con...
872
      LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
51becbde   Peter M. Groen   Committed the ent...
873
      {
ca0cf29e   Steven   syntax fixes, con...
874
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);        
51becbde   Peter M. Groen   Committed the ent...
875
876
877
878
879
          // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
          m_operationResult[-200] = false;
          m_pendingOperations.erase(-200);
      }
  
0c424e03   Steven   syntax fixes. WIP
880
      if( MQTTAsync_isConnected( m_client ) )
51becbde   Peter M. Groen   Committed the ent...
881
      {
0c424e03   Steven   syntax fixes. WIP
882
          setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
883
884
885
      }
      else
      {
0c424e03   Steven   syntax fixes. WIP
886
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
887
888
      }
  
0c424e03   Steven   syntax fixes. WIP
889
      if( m_disconnectPromise )
51becbde   Peter M. Groen   Committed the ent...
890
891
892
893
894
895
      {
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
896
  void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
897
898
  {
      auto pd = response.publishData();
d557d523   Peter M. Groen   Fix deferred subs...
899
      // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
900
901
902
903
904
905
906
907
908
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = true;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
909
  void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
910
  {
ca0cf29e   Steven   syntax fixes, con...
911
      LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
912
913
914
915
916
917
918
919
920
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
921
  void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
922
  {
ca0cf29e   Steven   syntax fixes, con...
923
924
925
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
51becbde   Peter M. Groen   Committed the ent...
926
927
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
928
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
929
930
931
      {
          // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
932
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
933
      });
ca0cf29e   Steven   syntax fixes, con...
934
935
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
0c424e03   Steven   syntax fixes. WIP
936
      {
ca0cf29e   Steven   syntax fixes, con...
937
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
938
939
940
941
942
          return;
      }
      auto topic = it->second;
      m_subscribeTokenToTopic.erase(it);
  
ca0cf29e   Steven   syntax fixes, con...
943
944
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
945
      {
ca0cf29e   Steven   syntax fixes, con...
946
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
947
948
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
949
      if( response.qos() != pendingIt->second.qos )
51becbde   Peter M. Groen   Committed the ent...
950
      {
ca0cf29e   Steven   syntax fixes, con...
951
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
51becbde   Peter M. Groen   Committed the ent...
952
      }
ca0cf29e   Steven   syntax fixes, con...
953
954
955
956
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
      m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
957
958
959
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
960
  void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
961
  {
ca0cf29e   Steven   syntax fixes, con...
962
963
964
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
965
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
966
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
967
968
969
      {
          // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
970
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
971
972
      });
  
ca0cf29e   Steven   syntax fixes, con...
973
974
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
975
      {
ca0cf29e   Steven   syntax fixes, con...
976
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
977
978
979
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
980
      m_subscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
981
  
ca0cf29e   Steven   syntax fixes, con...
982
983
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
984
      {
ca0cf29e   Steven   syntax fixes, con...
985
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
986
987
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
988
989
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
990
991
  }
  
ca0cf29e   Steven   syntax fixes, con...
992
  void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
993
  {
ca0cf29e   Steven   syntax fixes, con...
994
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
51becbde   Peter M. Groen   Committed the ent...
995
  
ca0cf29e   Steven   syntax fixes, con...
996
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
997
998
999
1000
1001
1002
1003
1004
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
      // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
      // As a workaround the last unsubscribe token is stored and is used when no valid token is available.
      // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
      // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
      // sequentially (see ClientPaho::unsubscribe)!
      auto token = response.token();
ca0cf29e   Steven   syntax fixes, con...
1005
      if( -1 == token )
51becbde   Peter M. Groen   Committed the ent...
1006
1007
1008
1009
1010
1011
      {
          token = m_lastUnsubscribe;
          m_lastUnsubscribe = -1;
      }
  
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
1012
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
1013
1014
1015
      {
          // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
          m_operationResult[token] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
1016
          m_pendingOperations.erase( token );
51becbde   Peter M. Groen   Committed the ent...
1017
1018
      });
  
ca0cf29e   Steven   syntax fixes, con...
1019
1020
      auto it = m_unsubscribeTokenToTopic.find( token );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1021
      {
ca0cf29e   Steven   syntax fixes, con...
1022
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
51becbde   Peter M. Groen   Committed the ent...
1023
1024
1025
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1026
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1027
  
ca0cf29e   Steven   syntax fixes, con...
1028
1029
1030
1031
      auto registeredIt = m_subscriptions.find( topic );
      if( m_subscriptions.end() == registeredIt )
      {
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1032
1033
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
1034
1035
1036
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
      m_subscriptions.erase( registeredIt );
51becbde   Peter M. Groen   Committed the ent...
1037
1038
1039
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1040
  void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
1041
  {
ca0cf29e   Steven   syntax fixes, con...
1042
1043
      LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
1044
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1045
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
1046
1047
1048
      {
          // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
1049
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
1050
1051
      });
  
ca0cf29e   Steven   syntax fixes, con...
1052
1053
      auto it = m_unsubscribeTokenToTopic.find( response.token() );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1054
      {
ca0cf29e   Steven   syntax fixes, con...
1055
          LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1056
1057
1058
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1059
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1060
1061
  }
  
ca0cf29e   Steven   syntax fixes, con...
1062
  int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
51becbde   Peter M. Groen   Committed the ent...
1063
  {
ca0cf29e   Steven   syntax fixes, con...
1064
      LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1065
  
ca0cf29e   Steven   syntax fixes, con...
1066
      std::function<void( MqttMessage )> cb;
51becbde   Peter M. Groen   Committed the ent...
1067
1068
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1069
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
1070
          {
ca0cf29e   Steven   syntax fixes, con...
1071
              if( boost::regex_match( message.topic(), s.second.topicRegex ) )
51becbde   Peter M. Groen   Committed the ent...
1072
1073
1074
1075
1076
1077
              {
                  cb = s.second.callback;
              }
          }
      }
  
ca0cf29e   Steven   syntax fixes, con...
1078
      if( cb )
51becbde   Peter M. Groen   Committed the ent...
1079
      {
ca0cf29e   Steven   syntax fixes, con...
1080
          cb( message );
51becbde   Peter M. Groen   Committed the ent...
1081
1082
1083
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
1084
          LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
1085
1086
1087
1088
      }
      return 1;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1089
  void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1090
  {
ca0cf29e   Steven   syntax fixes, con...
1091
1092
      LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
      if( m_deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
1093
      {
ca0cf29e   Steven   syntax fixes, con...
1094
          m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
51becbde   Peter M. Groen   Committed the ent...
1095
1096
1097
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1098
  void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
1099
  {
ca0cf29e   Steven   syntax fixes, con...
1100
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
1101
      // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
ca0cf29e   Steven   syntax fixes, con...
1102
      setConnectionStatus( ConnectionStatus::ReconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
1103
1104
1105
  
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      // Remove all tokens related to subscriptions from the active operations.
ca0cf29e   Steven   syntax fixes, con...
1106
      for( const auto& p : m_subscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1107
1108
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1109
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1110
1111
      }
  
ca0cf29e   Steven   syntax fixes, con...
1112
      for( const auto& p : m_unsubscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1113
1114
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1115
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1116
1117
1118
1119
1120
1121
1122
      }
      // Clear the administration used in the subscribe process.
      m_subscribeTokenToTopic.clear();
      m_unsubscribeTokenToTopic.clear();
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1123
  void ClientPaho::onFirstConnect( void* context, char* cause )
9421324b   Peter M. Groen   First fix on conn...
1124
1125
  {
      LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1126
      if( context )
9421324b   Peter M. Groen   First fix on conn...
1127
      {
ca0cf29e   Steven   syntax fixes, con...
1128
1129
          auto *cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "Unknown cause" : cause );
e8f6540b   Steven   also push to queu...
1130
1131
          //cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
9421324b   Peter M. Groen   First fix on conn...
1132
1133
1134
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1135
  void ClientPaho::onConnect( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1136
  {
9421324b   Peter M. Groen   First fix on conn...
1137
      LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1138
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1139
      {
ca0cf29e   Steven   syntax fixes, con...
1140
1141
1142
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "unknown cause" : cause );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
51becbde   Peter M. Groen   Committed the ent...
1143
1144
1145
1146
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1147
  void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1148
  {
2c0c99a5   Peter M. Groen   Fix connect Callb...
1149
      LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1150
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1151
      {
ca0cf29e   Steven   syntax fixes, con...
1152
1153
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
76d01373   Peter M. Groen   Fix on connection
1154
          {
51becbde   Peter M. Groen   Committed the ent...
1155
              // connect should always have a valid response struct.
ca0cf29e   Steven   syntax fixes, con...
1156
              LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
2c0c99a5   Peter M. Groen   Fix connect Callb...
1157
              return;
51becbde   Peter M. Groen   Committed the ent...
1158
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
1159
          // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
ca0cf29e   Steven   syntax fixes, con...
1160
          cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
51becbde   Peter M. Groen   Committed the ent...
1161
1162
1163
1164
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1165
  void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1166
  {
ca0cf29e   Steven   syntax fixes, con...
1167
1168
      LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1169
      {
ca0cf29e   Steven   syntax fixes, con...
1170
1171
1172
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
      }
  }
  
  //// static
  //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
  //{
  //    apply_unused_parameters(properties);
  //    try {
  //        if (context) {
  //            auto* cl = reinterpret_cast<ClientPaho*>(context);
  //            cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
  //        }
  //    }
  //    catch (...) {
  //    }
  //    catch (const std::exception& e) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
  //    }
  //    catch (...) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
  //    }
  //}
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1197
  void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1198
  {
ca0cf29e   Steven   syntax fixes, con...
1199
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1200
      {
ca0cf29e   Steven   syntax fixes, con...
1201
1202
1203
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : 0 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1204
1205
1206
1207
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1208
  void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1209
  {
9421324b   Peter M. Groen   First fix on conn...
1210
      LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1211
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1212
      {
ca0cf29e   Steven   syntax fixes, con...
1213
1214
1215
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1216
1217
1218
1219
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1220
  void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1221
  {
ca0cf29e   Steven   syntax fixes, con...
1222
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1223
      {
ca0cf29e   Steven   syntax fixes, con...
1224
1225
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
          {
              // publish should always have a valid response struct.
              // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
          }
          MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
          cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1236
  void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1237
  {
ca0cf29e   Steven   syntax fixes, con...
1238
1239
      (void) response;
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1240
      {
ca0cf29e   Steven   syntax fixes, con...
1241
1242
1243
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1244
1245
1246
1247
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1248
  void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1249
  {
ca0cf29e   Steven   syntax fixes, con...
1250
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1251
      {
ca0cf29e   Steven   syntax fixes, con...
1252
1253
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1254
1255
1256
1257
          {
              // subscribe should always have a valid response struct.
              // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
          }
ca0cf29e   Steven   syntax fixes, con...
1258
1259
          MqttSuccess resp( response->token, response->alt.qos );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1260
1261
1262
1263
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1264
  void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1265
  {
ca0cf29e   Steven   syntax fixes, con...
1266
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1267
      {
ca0cf29e   Steven   syntax fixes, con...
1268
1269
1270
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1271
1272
1273
1274
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1275
  void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1276
  {
ca0cf29e   Steven   syntax fixes, con...
1277
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1278
      {
ca0cf29e   Steven   syntax fixes, con...
1279
1280
1281
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : -1 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1282
1283
1284
1285
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1286
  void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1287
  {
ca0cf29e   Steven   syntax fixes, con...
1288
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1289
      {
ca0cf29e   Steven   syntax fixes, con...
1290
1291
1292
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1293
1294
1295
1296
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1297
  int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
51becbde   Peter M. Groen   Committed the ent...
1298
1299
  {
  
ca0cf29e   Steven   syntax fixes, con...
1300
      OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
51becbde   Peter M. Groen   Committed the ent...
1301
      {
ca0cf29e   Steven   syntax fixes, con...
1302
1303
          MQTTAsync_freeMessage( &message );
          MQTTAsync_free( topicName );
51becbde   Peter M. Groen   Committed the ent...
1304
1305
      });
  
ca0cf29e   Steven   syntax fixes, con...
1306
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1307
      {
ca0cf29e   Steven   syntax fixes, con...
1308
1309
1310
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttMessage msg( topicName, *message );
          cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1311
1312
1313
1314
1315
1316
      }
  
      return 1; // always return true. Otherwise this callback is triggered again.
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1317
  void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1318
  {
ca0cf29e   Steven   syntax fixes, con...
1319
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1320
      {
ca0cf29e   Steven   syntax fixes, con...
1321
1322
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
51becbde   Peter M. Groen   Committed the ent...
1323
1324
1325
1326
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1327
  void ClientPaho::onConnectionLost( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1328
  {
ca0cf29e   Steven   syntax fixes, con...
1329
      OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
51becbde   Peter M. Groen   Committed the ent...
1330
      {
ca0cf29e   Steven   syntax fixes, con...
1331
          if( cause )
51becbde   Peter M. Groen   Committed the ent...
1332
          {
ca0cf29e   Steven   syntax fixes, con...
1333
              MQTTAsync_free( cause );
51becbde   Peter M. Groen   Committed the ent...
1334
1335
1336
          }
      });
  
ca0cf29e   Steven   syntax fixes, con...
1337
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1338
      {
ca0cf29e   Steven   syntax fixes, con...
1339
1340
1341
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string msg( nullptr == cause ? "cause unknown" : cause );
          cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1342
1343
1344
1345
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1346
  void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
51becbde   Peter M. Groen   Committed the ent...
1347
  {
ca0cf29e   Steven   syntax fixes, con...
1348
1349
      (void) message;
      switch( level )
51becbde   Peter M. Groen   Committed the ent...
1350
1351
1352
      {
          case MQTTASYNC_TRACE_MAXIMUM:
          case MQTTASYNC_TRACE_MEDIUM:
ca0cf29e   Steven   syntax fixes, con...
1353
1354
          case MQTTASYNC_TRACE_MINIMUM:
          {
51becbde   Peter M. Groen   Committed the ent...
1355
1356
1357
              // ("ClientPaho", "paho - %1", message)
              break;
          }
ca0cf29e   Steven   syntax fixes, con...
1358
1359
          case MQTTASYNC_TRACE_PROTOCOL:
          {
51becbde   Peter M. Groen   Committed the ent...
1360
1361
1362
1363
1364
              // ("ClientPaho", "paho - %1", message)
              break;
          }
          case MQTTASYNC_TRACE_ERROR:
          case MQTTASYNC_TRACE_SEVERE:
ca0cf29e   Steven   syntax fixes, con...
1365
1366
          case MQTTASYNC_TRACE_FATAL:
          {
51becbde   Peter M. Groen   Committed the ent...
1367
1368
1369
1370
1371
              // ("ClientPaho", "paho - %1", message)
              break;
          }
      }
  }