Blame view

src/clientpaho.cpp 49.7 KB
b5d9e433   Peter M. Groen   Fixed License Hea...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  /* ****************************************************************************
   * Copyright 2019 Open Systems Development BV                                 *
   *                                                                            *
   * Permission is hereby granted, free of charge, to any person obtaining a    *
   * copy of this software and associated documentation files (the "Software"), *
   * to deal in the Software without restriction, including without limitation  *
   * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
   * and/or sell copies of the Software, and to permit persons to whom the      *
   * Software is furnished to do so, subject to the following conditions:       *
   *                                                                            *
   * The above copyright notice and this permission notice shall be included in *
   * all copies or substantial portions of the Software.                        *
   *                                                                            *
   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
   * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
   * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
   * DEALINGS IN THE SOFTWARE.                                                  *
   * ***************************************************************************/
51becbde   Peter M. Groen   Committed the ent...
22
23
24
25
26
  #include "clientpaho.h"
  
  #include "errorcode.h"
  #include "mqttutil.h"
  #include "lockguard.h"
9421324b   Peter M. Groen   First fix on conn...
27
  #include "log.h"
51becbde   Peter M. Groen   Committed the ent...
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
  #include "metaprogrammingdefs.h"
  #include "mqttstream.h"
  #include "scopeguard.h"
  #include "uriparser.h"
  
  // std::chrono
  #include "compat-chrono.h"
  
  // std
  #include <algorithm>
  #include <iterator>
  
  using namespace osdev::components::mqtt;
  
  namespace {
  
  #if defined(__clang__)
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wunused-template"
  #endif
  
  OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)
  
  template <typename TRet>
  inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
      return MQTTAsync_disconnectOptions_initializer;
  }
  
  template <typename TRet>
  inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
  // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
  // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
  // method should not be chosen when the struct does not contain member onSuccess5!
  // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
  // while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
  // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
  // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
  #ifndef MQTTAsync_disconnectOptions_initializer5
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
      TRet ret = MQTTAsync_disconnectOptions_initializer;
      ret.struct_version = 1;
      ret.onSuccess5 = nullptr;
      ret.onFailure5 = nullptr;
      return ret;
  #pragma GCC diagnostic pop
  #else
      return MQTTAsync_disconnectOptions_initializer5;
  #endif
  }
  
  template <typename TRet>
  struct Init
  {
      static TRet initialize()
      {
          return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
      }
  };
  #if defined(__clang__)
  #pragma GCC diagnostic pop
  #endif
  
  } // namespace
  
  std::atomic_int ClientPaho::s_numberOfInstances(0);
  
ca0cf29e   Steven   syntax fixes, con...
97
  ClientPaho::ClientPaho( const std::string& _endpoint,
51becbde   Peter M. Groen   Committed the ent...
98
      const std::string& _id,
ca0cf29e   Steven   syntax fixes, con...
99
100
      const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
      const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
      : m_mutex()
      , m_endpoint()
      , m_username()
      , m_password()
      , m_clientId(_id)
      , m_pendingOperations()
      , m_operationResult()
      , m_operationsCompleteCV()
      , m_subscriptions()
      , m_pendingSubscriptions()
      , m_subscribeTokenToTopic()
      , m_unsubscribeTokenToTopic()
      , m_pendingPublishes()
      , m_processPendingPublishes(false)
      , m_pendingPublishesReadyCV()
      , m_client()
      , m_connectionStatus(ConnectionStatus::Disconnected)
      , m_connectionStatusCallback(connectionStatusCallback)
      , m_deliveryCompleteCallback(deliveryCompleteCallback)
      , m_lastUnsubscribe(-1)
      , m_connectPromise()
      , m_disconnectPromise()
      , m_callbackEventQueue(m_clientId)
      , m_workerThread()
  {
ca0cf29e   Steven   syntax fixes, con...
126
      if( 0 == s_numberOfInstances++ )
76d01373   Peter M. Groen   Fix on connection
127
      {
ca0cf29e   Steven   syntax fixes, con...
128
          MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
51becbde   Peter M. Groen   Committed the ent...
129
      }
76d01373   Peter M. Groen   Fix on connection
130
  
ca0cf29e   Steven   syntax fixes, con...
131
      LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
51becbde   Peter M. Groen   Committed the ent...
132
      parseEndpoint(_endpoint);
76d01373   Peter M. Groen   Fix on connection
133
  
ca0cf29e   Steven   syntax fixes, con...
134
135
      auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
      if( MQTTASYNC_SUCCESS == rc )
51becbde   Peter M. Groen   Committed the ent...
136
      {
ca0cf29e   Steven   syntax fixes, con...
137
138
          MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
          m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
51becbde   Peter M. Groen   Committed the ent...
139
140
141
      }
      else
      {
76d01373   Peter M. Groen   Fix on connection
142
          LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
143
144
145
146
147
      }
  }
  
  ClientPaho::~ClientPaho()
  {
ca0cf29e   Steven   syntax fixes, con...
148
      LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
51becbde   Peter M. Groen   Committed the ent...
149
150
151
152
      if( MQTTAsync_isConnected( m_client ) )
      {
          this->unsubscribeAll();
  
ca0cf29e   Steven   syntax fixes, con...
153
154
          this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
          this->disconnect( true, 5000 );
51becbde   Peter M. Groen   Committed the ent...
155
156
157
158
      }
      else
      {
          // If the status was already disconnected this call does nothing
ca0cf29e   Steven   syntax fixes, con...
159
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
160
161
      }
  
ca0cf29e   Steven   syntax fixes, con...
162
      if( 0 == --s_numberOfInstances )
51becbde   Peter M. Groen   Committed the ent...
163
164
165
166
167
      {
          // encountered a case where termination of the logging system within paho led to a segfault.
          // This was a paho thread that was cleaned while at the same time the logging system was terminated.
          // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
          // frequently.
ca0cf29e   Steven   syntax fixes, con...
168
          MQTTAsync_setTraceCallback( nullptr );
51becbde   Peter M. Groen   Committed the ent...
169
170
      }
  
ca0cf29e   Steven   syntax fixes, con...
171
      MQTTAsync_destroy( &m_client );
51becbde   Peter M. Groen   Committed the ent...
172
173
  
      m_callbackEventQueue.stop();
ca0cf29e   Steven   syntax fixes, con...
174
      if( m_workerThread.joinable() )
51becbde   Peter M. Groen   Committed the ent...
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
      {
          m_workerThread.join();
      }
  }
  
  std::string ClientPaho::clientId() const
  {
      return m_clientId;
  }
  
  ConnectionStatus ClientPaho::connectionStatus() const
  {
      return m_connectionStatus;
  }
  
31eece9b   Steven   added LWT ( last ...
190
  std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
51becbde   Peter M. Groen   Committed the ent...
191
192
193
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
2b967ea7   Steven   promise is not wo...
194
          if( ConnectionStatus::Disconnected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
195
196
197
          {
              return -1;
          }
2b967ea7   Steven   promise is not wo...
198
          setConnectionStatus( ConnectionStatus::ConnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
199
200
      }
  
76d01373   Peter M. Groen   Fix on connection
201
202
      LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
  
51becbde   Peter M. Groen   Committed the ent...
203
      MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
9421324b   Peter M. Groen   First fix on conn...
204
      conn_opts.keepAliveInterval = 5;
51becbde   Peter M. Groen   Committed the ent...
205
      conn_opts.cleansession = 1;
2c0c99a5   Peter M. Groen   Fix connect Callb...
206
      conn_opts.onSuccess = nullptr;
51becbde   Peter M. Groen   Committed the ent...
207
208
209
      conn_opts.onFailure = &ClientPaho::onConnectFailure;
      conn_opts.context = this;
      conn_opts.automaticReconnect = 1;
31eece9b   Steven   added LWT ( last ...
210
  
76d01373   Peter M. Groen   Fix on connection
211
212
213
214
      // Make sure we get a signal if the promise is fulfilled
      auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
      if( MQTTASYNC_SUCCESS == ccb )
      {
ca0cf29e   Steven   syntax fixes, con...
215
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
76d01373   Peter M. Groen   Fix on connection
216
217
218
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
219
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
76d01373   Peter M. Groen   Fix on connection
220
221
222
      }
  
      // Setup the last will and testament, if so desired.
31eece9b   Steven   added LWT ( last ...
223
224
225
226
227
228
229
      if( !lwt.topic().empty() )
      {
          MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
          will_opts.message = lwt.message().c_str();
          will_opts.topicName = lwt.topic().c_str();
  
          conn_opts.will = &will_opts;
76d01373   Peter M. Groen   Fix on connection
230
231
  
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
31eece9b   Steven   added LWT ( last ...
232
233
234
235
236
237
      }
      else
      {
          conn_opts.will = nullptr;
      }
  
2b967ea7   Steven   promise is not wo...
238
      if( !m_username.empty() )
51becbde   Peter M. Groen   Committed the ent...
239
240
241
242
      {
          conn_opts.username = m_username.c_str();
      }
  
2b967ea7   Steven   promise is not wo...
243
      if( !m_password.empty() )
51becbde   Peter M. Groen   Committed the ent...
244
245
246
247
248
249
250
      {
          conn_opts.password = m_password.c_str();
      }
  
      std::promise<void> waitForConnectPromise{};
      auto waitForConnect = waitForConnectPromise.get_future();
      m_connectPromise.reset();
2b967ea7   Steven   promise is not wo...
251
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
252
      {
2b967ea7   Steven   promise is not wo...
253
          m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
51becbde   Peter M. Groen   Committed the ent...
254
255
256
      }
  
      {
2b967ea7   Steven   promise is not wo...
257
258
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
          if( !m_pendingOperations.insert( -100 ).second )
51becbde   Peter M. Groen   Committed the ent...
259
260
261
          {
              // Write something
          }
2b967ea7   Steven   promise is not wo...
262
          m_operationResult.erase( -100 );
51becbde   Peter M. Groen   Committed the ent...
263
264
      }
  
2b967ea7   Steven   promise is not wo...
265
266
      int rc = MQTTAsync_connect( m_client, &conn_opts );
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
267
      {
2b967ea7   Steven   promise is not wo...
268
269
          setConnectionStatus( ConnectionStatus::Disconnected );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
270
271
272
273
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
  
2b967ea7   Steven   promise is not wo...
274
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
275
276
277
278
279
280
281
      {
          waitForConnect.get();
          m_connectPromise.reset();
      }
      return -100;
  }
  
0c424e03   Steven   syntax fixes. WIP
282
  std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
51becbde   Peter M. Groen   Committed the ent...
283
284
285
286
287
  {
      ConnectionStatus currentStatus = m_connectionStatus;
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
288
289
          if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
          {
51becbde   Peter M. Groen   Committed the ent...
290
291
292
293
              return -1;
          }
  
          currentStatus = m_connectionStatus;
0c424e03   Steven   syntax fixes. WIP
294
          setConnectionStatus( ConnectionStatus::DisconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
295
296
297
298
299
300
301
302
303
304
305
      }
  
      MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
      disconn_opts.timeout = timeoutMs;
      disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
      disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
      disconn_opts.context = this;
  
      std::promise<void> waitForDisconnectPromise{};
      auto waitForDisconnect = waitForDisconnectPromise.get_future();
      m_disconnectPromise.reset();
0c424e03   Steven   syntax fixes. WIP
306
307
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
308
309
310
311
312
          m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
313
          if( !m_pendingOperations.insert( -200 ).second )
51becbde   Peter M. Groen   Committed the ent...
314
          {
ca0cf29e   Steven   syntax fixes, con...
315
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
316
          }
ca0cf29e   Steven   syntax fixes, con...
317
          m_operationResult.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
318
319
      }
  
ca0cf29e   Steven   syntax fixes, con...
320
      int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
0c424e03   Steven   syntax fixes. WIP
321
      if( MQTTASYNC_SUCCESS != rc )
76d01373   Peter M. Groen   Fix on connection
322
      {
0c424e03   Steven   syntax fixes. WIP
323
          if( MQTTASYNC_DISCONNECTED == rc )
76d01373   Peter M. Groen   Fix on connection
324
          {
51becbde   Peter M. Groen   Committed the ent...
325
326
327
              currentStatus = ConnectionStatus::Disconnected;
          }
  
0c424e03   Steven   syntax fixes. WIP
328
329
          setConnectionStatus( currentStatus );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
330
          m_operationResult[-200] = false;
ca0cf29e   Steven   syntax fixes, con...
331
          m_pendingOperations.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
332
  
0c424e03   Steven   syntax fixes. WIP
333
          if( MQTTASYNC_DISCONNECTED == rc )
2b967ea7   Steven   promise is not wo...
334
          {
51becbde   Peter M. Groen   Committed the ent...
335
336
              return -1;
          }
ca0cf29e   Steven   syntax fixes, con...
337
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
338
339
      }
  
2b967ea7   Steven   promise is not wo...
340
341
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
342
343
          if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
          {
ca0cf29e   Steven   syntax fixes, con...
344
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
51becbde   Peter M. Groen   Committed the ent...
345
346
347
348
349
350
351
          }
          waitForDisconnect.get();
          m_disconnectPromise.reset();
      }
      return -200;
  }
  
ca0cf29e   Steven   syntax fixes, con...
352
  std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
353
  {
0c424e03   Steven   syntax fixes. WIP
354
      if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
355
      {
ca0cf29e   Steven   syntax fixes, con...
356
          LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
357
358
          return -1;
      }
0c424e03   Steven   syntax fixes. WIP
359
      else if( ConnectionStatus::Disconnected == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
360
      {
ca0cf29e   Steven   syntax fixes, con...
361
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
2b967ea7   Steven   promise is not wo...
362
          connect( true );
51becbde   Peter M. Groen   Committed the ent...
363
364
      }
  
0c424e03   Steven   syntax fixes. WIP
365
      if( !isValidTopic(message.topic() ) )
51becbde   Peter M. Groen   Committed the ent...
366
      {
ca0cf29e   Steven   syntax fixes, con...
367
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
51becbde   Peter M. Groen   Committed the ent...
368
369
      }
  
0c424e03   Steven   syntax fixes. WIP
370
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
371
372
373
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
374
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
375
376
377
378
      {
          qos = 0;
      }
  
51becbde   Peter M. Groen   Committed the ent...
379
      std::unique_lock<std::mutex> lck(m_mutex);
9802eeb9   Steven   statement changes
380
      if( ConnectionStatus::Connected != m_connectionStatus || m_processPendingPublishes )
a670240b   Peter M. Groen   Fix on connection
381
      {
51becbde   Peter M. Groen   Committed the ent...
382
          m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
9802eeb9   Steven   statement changes
383
          if( ConnectionStatus::Connected != m_connectionStatus )
a670240b   Peter M. Groen   Fix on connection
384
          {
0c424e03   Steven   syntax fixes. WIP
385
              LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
ca0cf29e   Steven   syntax fixes, con...
386
              m_pendingPublishes.push_front( Publish{ qos, message } );
51becbde   Peter M. Groen   Committed the ent...
387
388
389
390
              return -1;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
391
      return publishInternal( message, qos );
51becbde   Peter M. Groen   Committed the ent...
392
393
394
395
396
397
  }
  
  void ClientPaho::publishPending()
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
398
          if( !m_processPendingPublishes )
2b967ea7   Steven   promise is not wo...
399
          {
51becbde   Peter M. Groen   Committed the ent...
400
401
402
403
              return;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
404
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
405
      {
a670240b   Peter M. Groen   Fix on connection
406
          LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
51becbde   Peter M. Groen   Committed the ent...
407
408
      }
  
0c424e03   Steven   syntax fixes. WIP
409
      while( !m_pendingPublishes.empty() )
51becbde   Peter M. Groen   Committed the ent...
410
411
      {
          const auto& pub = m_pendingPublishes.back();
ca0cf29e   Steven   syntax fixes, con...
412
          publishInternal( pub.data, pub.qos );
51becbde   Peter M. Groen   Committed the ent...
413
414
415
416
417
418
419
420
421
422
423
  
          m_pendingPublishes.pop_back();
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_processPendingPublishes = false;
      }
      m_pendingPublishesReadyCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
424
  std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
51becbde   Peter M. Groen   Committed the ent...
425
  {
0c424e03   Steven   syntax fixes. WIP
426
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
427
      {
11fe0b09   Peter M. Groen   Rework for subscr...
428
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
51becbde   Peter M. Groen   Committed the ent...
429
430
      }
  
0c424e03   Steven   syntax fixes. WIP
431
      if( !isValidTopic( topic ) )
51becbde   Peter M. Groen   Committed the ent...
432
433
      {
          // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
434
435
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
          return -1;
51becbde   Peter M. Groen   Committed the ent...
436
437
      }
  
0c424e03   Steven   syntax fixes. WIP
438
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
439
440
441
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
442
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
443
444
445
446
447
448
449
450
      {
          qos = 0;
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
          auto itExisting = m_subscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
451
452
453
454
          if( m_subscriptions.end() != itExisting )
          {
              if( itExisting->second.qos == qos )
              {
51becbde   Peter M. Groen   Committed the ent...
455
456
457
458
459
460
                  return -1;
              }
              // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
          }
  
          auto itPending = m_pendingSubscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
461
462
463
464
465
466
467
          if( m_pendingSubscriptions.end() != itPending )
          {
              if( itPending->second.qos == qos )
              {
                  auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
                  if( m_subscribeTokenToTopic.end() != itToken )
                  {
51becbde   Peter M. Groen   Committed the ent...
468
469
                      return itToken->first;
                  }
0c424e03   Steven   syntax fixes. WIP
470
471
                  else
                  {
51becbde   Peter M. Groen   Committed the ent...
472
473
474
475
476
477
478
                      return -1;
                  }
              }
              // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
          }
  
          std::string existingTopic{};
0c424e03   Steven   syntax fixes. WIP
479
          if( isOverlappingInternal( topic, existingTopic ) )
51becbde   Peter M. Groen   Committed the ent...
480
481
          {
              // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
482
              LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
483
484
          }
  
ca0cf29e   Steven   syntax fixes, con...
485
486
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
          m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
51becbde   Peter M. Groen   Committed the ent...
487
      }
0c424e03   Steven   syntax fixes. WIP
488
      return subscribeInternal( topic, qos );
51becbde   Peter M. Groen   Committed the ent...
489
490
491
492
  }
  
  void ClientPaho::resubscribe()
  {
ca0cf29e   Steven   syntax fixes, con...
493
      decltype( m_pendingSubscriptions ) pendingSubscriptions{};
51becbde   Peter M. Groen   Committed the ent...
494
495
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
496
          std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
497
498
      }
  
0c424e03   Steven   syntax fixes. WIP
499
      for( const auto& s : pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
500
      {
0c424e03   Steven   syntax fixes. WIP
501
          subscribeInternal( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
502
503
504
      }
  }
  
31eece9b   Steven   added LWT ( last ...
505
  std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
506
507
  {
      {
0c424e03   Steven   syntax fixes. WIP
508
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
509
          bool found = false;
0c424e03   Steven   syntax fixes. WIP
510
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
511
          {
0c424e03   Steven   syntax fixes. WIP
512
              if( topic == s.first && qos == s.second.qos )
51becbde   Peter M. Groen   Committed the ent...
513
514
515
516
517
              {
                  found = true;
                  break;
              }
          }
2569446f   Peter M. Groen   Added extra subsc...
518
519
520
521
522
523
524
525
526
527
528
529
530
  
          if(!found)  // Probably not found in subscriptions, also check the pendings.
          {
              for( const auto &s : m_pendingSubscriptions )
              {
                  if( topic == s.first && qos == s.second.qos )
                  {
                      found = true;
                      break;
                  }
              }
          }
  
0c424e03   Steven   syntax fixes. WIP
531
          if( !found )
51becbde   Peter M. Groen   Committed the ent...
532
533
534
535
536
537
538
539
540
541
542
543
544
545
          {
              return -1;
          }
      }
  
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
      opts.onFailure = &ClientPaho::onUnsubscribeFailure;
      opts.context = this;
  
      {
          // Need to lock the mutex because it is possible that the callback is faster than
          // the insertion of the token into the pending operations.
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
546
          auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
0c424e03   Steven   syntax fixes. WIP
547
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
548
          {
ca0cf29e   Steven   syntax fixes, con...
549
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
550
551
          }
  
0c424e03   Steven   syntax fixes. WIP
552
          if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
553
          {
ca0cf29e   Steven   syntax fixes, con...
554
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token )  + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
555
556
          }
  
0c424e03   Steven   syntax fixes. WIP
557
558
          m_operationResult.erase( opts.token );
          if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
559
          {
ca0cf29e   Steven   syntax fixes, con...
560
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic  " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
561
562
563
564
565
566
567
568
569
570
571
572
573
          }
          m_lastUnsubscribe = opts.token; // centos7 workaround
          m_unsubscribeTokenToTopic[opts.token] = topic;
      }
  
      // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
      this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });
  
      return opts.token;
  }
  
  void ClientPaho::unsubscribeAll()
  {
0c424e03   Steven   syntax fixes. WIP
574
      decltype( m_subscriptions ) subscriptions{};
51becbde   Peter M. Groen   Committed the ent...
575
576
577
578
579
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          subscriptions = m_subscriptions;
      }
  
0c424e03   Steven   syntax fixes. WIP
580
581
      for( const auto& s : subscriptions )
      {
ca0cf29e   Steven   syntax fixes, con...
582
          this->unsubscribe( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
583
584
585
586
587
      }
  }
  
  std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
  {
ca0cf29e   Steven   syntax fixes, con...
588
589
590
      if( waitFor <= std::chrono::milliseconds( 0 ) )
      {
          return std::chrono::milliseconds( 0 );
51becbde   Peter M. Groen   Committed the ent...
591
592
593
      }
      std::chrono::milliseconds timeElapsed{};
      {
ca0cf29e   Steven   syntax fixes, con...
594
          osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
51becbde   Peter M. Groen   Committed the ent...
595
          {
ca0cf29e   Steven   syntax fixes, con...
596
              timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
51becbde   Peter M. Groen   Committed the ent...
597
598
          });
          std::unique_lock<std::mutex> lck(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
599
  
51becbde   Peter M. Groen   Committed the ent...
600
601
602
          // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
          m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
          {
ca0cf29e   Steven   syntax fixes, con...
603
              if( tokens.empty() )
51becbde   Peter M. Groen   Committed the ent...
604
605
606
              { // wait for all operations to end
                  return m_pendingOperations.empty();
              }
ca0cf29e   Steven   syntax fixes, con...
607
              else if( tokens.size() == 1 )
51becbde   Peter M. Groen   Committed the ent...
608
              {
ca0cf29e   Steven   syntax fixes, con...
609
                  return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
51becbde   Peter M. Groen   Committed the ent...
610
611
612
613
614
615
616
617
618
              }
              std::vector<std::int32_t> intersect{};
              std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
              return intersect.empty();
          } );
      }
      return timeElapsed;
  }
  
ca0cf29e   Steven   syntax fixes, con...
619
  bool ClientPaho::isOverlapping( const std::string& topic ) const
51becbde   Peter M. Groen   Committed the ent...
620
621
  {
      std::string existingTopic{};
ca0cf29e   Steven   syntax fixes, con...
622
      return isOverlapping( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
623
624
  }
  
ca0cf29e   Steven   syntax fixes, con...
625
  bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
626
627
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
628
      return isOverlappingInternal( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
629
630
631
632
633
634
635
  }
  
  std::vector<std::int32_t> ClientPaho::pendingOperations() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      std::vector<std::int32_t> retval{};
      retval.resize(m_pendingOperations.size());
ca0cf29e   Steven   syntax fixes, con...
636
      std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
51becbde   Peter M. Groen   Committed the ent...
637
638
639
640
641
642
643
644
645
      return retval;
  }
  
  bool ClientPaho::hasPendingSubscriptions() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      return !m_pendingSubscriptions.empty();
  }
  
ca0cf29e   Steven   syntax fixes, con...
646
  boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
51becbde   Peter M. Groen   Committed the ent...
647
648
649
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      boost::optional<bool> ret{};
ca0cf29e   Steven   syntax fixes, con...
650
651
      auto cit = m_operationResult.find( token );
      if( m_operationResult.end() != cit )
51becbde   Peter M. Groen   Committed the ent...
652
653
654
655
656
657
      {
          ret = cit->second;
      }
      return ret;
  }
  
ca0cf29e   Steven   syntax fixes, con...
658
  void ClientPaho::parseEndpoint( const std::string& _endpoint )
51becbde   Peter M. Groen   Committed the ent...
659
  {
ca0cf29e   Steven   syntax fixes, con...
660
661
      auto ep = UriParser::parse( _endpoint );
      if( ep.find( "user" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
662
663
664
665
666
      {
          m_username = ep["user"];
          ep["user"].clear();
      }
  
ca0cf29e   Steven   syntax fixes, con...
667
      if( ep.find( "password" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
668
669
670
671
      {
          m_password = ep["password"];
          ep["password"].clear();
      }
ca0cf29e   Steven   syntax fixes, con...
672
      m_endpoint = UriParser::toString( ep );
51becbde   Peter M. Groen   Committed the ent...
673
674
  }
  
ca0cf29e   Steven   syntax fixes, con...
675
  std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
676
677
678
679
680
681
682
683
684
685
686
687
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onPublishSuccess;
      opts.onFailure = &ClientPaho::onPublishFailure;
      opts.context = this;
      auto msg = message.toAsyncMessage();
      msg.qos = qos;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
  
      // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
688
      auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
ca0cf29e   Steven   syntax fixes, con...
689
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
690
      {
ca0cf29e   Steven   syntax fixes, con...
691
          LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
692
693
      }
  
ca0cf29e   Steven   syntax fixes, con...
694
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
695
      {
d557d523   Peter M. Groen   Fix deferred subs...
696
          // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
697
      }
ca0cf29e   Steven   syntax fixes, con...
698
      m_operationResult.erase( opts.token );
51becbde   Peter M. Groen   Committed the ent...
699
700
701
      return opts.token;
  }
  
ca0cf29e   Steven   syntax fixes, con...
702
  std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
703
704
705
706
707
708
709
710
711
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onSubscribeSuccess;
      opts.onFailure = &ClientPaho::onSubscribeFailure;
      opts.context = this;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
712
      auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
51becbde   Peter M. Groen   Committed the ent...
713
714
      if (MQTTASYNC_SUCCESS != rc)
      {
ca0cf29e   Steven   syntax fixes, con...
715
716
          m_pendingSubscriptions.erase( topic );
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
717
718
      }
  
ca0cf29e   Steven   syntax fixes, con...
719
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
720
      {
ca0cf29e   Steven   syntax fixes, con...
721
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
722
      }
ca0cf29e   Steven   syntax fixes, con...
723
724
      m_operationResult.erase( opts.token );
      if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
725
      {
ca0cf29e   Steven   syntax fixes, con...
726
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
727
728
729
730
731
      }
      m_subscribeTokenToTopic[opts.token] = topic;
      return opts.token;
  }
  
0c424e03   Steven   syntax fixes. WIP
732
  void ClientPaho::setConnectionStatus( ConnectionStatus status )
51becbde   Peter M. Groen   Committed the ent...
733
  {
d557d523   Peter M. Groen   Fix deferred subs...
734
      LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) );
51becbde   Peter M. Groen   Committed the ent...
735
736
      ConnectionStatus curStatus = m_connectionStatus;
      m_connectionStatus = status;
0c424e03   Steven   syntax fixes. WIP
737
      if( status != curStatus && m_connectionStatusCallback )
51becbde   Peter M. Groen   Committed the ent...
738
      {
d557d523   Peter M. Groen   Fix deferred subs...
739
          LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) );
0c424e03   Steven   syntax fixes. WIP
740
          m_connectionStatusCallback( m_clientId, status );
51becbde   Peter M. Groen   Committed the ent...
741
742
743
      }
  }
  
0c424e03   Steven   syntax fixes. WIP
744
  bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
745
746
  {
      existingTopic.clear();
0c424e03   Steven   syntax fixes. WIP
747
      for( const auto& s : m_pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
748
      {
0c424e03   Steven   syntax fixes. WIP
749
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
750
751
752
753
754
755
          {
              existingTopic = s.first;
              return true;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
756
      for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
757
      {
ed30280f   Steven   last changes.
758
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
759
760
761
762
763
764
765
766
          {
              existingTopic = s.first;
              return true;
          }
      }
      return false;
  }
  
ed30280f   Steven   last changes.
767
  void ClientPaho::pushIncomingEvent( std::function<void()> ev )
51becbde   Peter M. Groen   Committed the ent...
768
  {
ed30280f   Steven   last changes.
769
      m_callbackEventQueue.push( ev );
51becbde   Peter M. Groen   Committed the ent...
770
771
772
773
  }
  
  void ClientPaho::callbackEventHandler()
  {
ca0cf29e   Steven   syntax fixes, con...
774
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
0c424e03   Steven   syntax fixes. WIP
775
776
      for( ;; )
      {
51becbde   Peter M. Groen   Committed the ent...
777
          std::vector<std::function<void()>> events;
0c424e03   Steven   syntax fixes. WIP
778
          if( !m_callbackEventQueue.pop(events) )
51becbde   Peter M. Groen   Committed the ent...
779
780
781
782
          {
              break;
          }
  
0c424e03   Steven   syntax fixes. WIP
783
          for( const auto& ev : events )
51becbde   Peter M. Groen   Committed the ent...
784
785
          {
              ev();
51becbde   Peter M. Groen   Committed the ent...
786
787
          }
      }
ed30280f   Steven   last changes.
788
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
51becbde   Peter M. Groen   Committed the ent...
789
  }
0c424e03   Steven   syntax fixes. WIP
790
  void ClientPaho::onConnectOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
791
  {
ca0cf29e   Steven   syntax fixes, con...
792
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
793
794
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
795
          std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
796
797
798
799
          m_subscriptions.clear();
          m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
      }
  
ed30280f   Steven   last changes.
800
      setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
801
802
  }
  
2c0c99a5   Peter M. Groen   Fix connect Callb...
803
  void ClientPaho::onConnectSuccessOnInstance()
51becbde   Peter M. Groen   Committed the ent...
804
  {
9802eeb9   Steven   statement changes
805
806
      m_processPendingPublishes = true;
  
d557d523   Peter M. Groen   Fix deferred subs...
807
808
      LogDebug( "[ClientPaho::onConnectSuccessOnInstance]",
                std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) );
51becbde   Peter M. Groen   Committed the ent...
809
810
811
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // Register the connect callback that is used in reconnect scenarios.
0c424e03   Steven   syntax fixes. WIP
812
813
          auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
814
          {
0c424e03   Steven   syntax fixes. WIP
815
              LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
51becbde   Peter M. Groen   Committed the ent...
816
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
817
  
51becbde   Peter M. Groen   Committed the ent...
818
819
820
821
822
823
          // For MQTTV5
          //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
          //if (MQTTASYNC_SUCCESS != rc) {
          //    // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
          //}
          // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
2c0c99a5   Peter M. Groen   Fix connect Callb...
824
  
51becbde   Peter M. Groen   Committed the ent...
825
826
827
          m_operationResult[-100] = true;
          m_pendingOperations.erase(-100);
      }
9421324b   Peter M. Groen   First fix on conn...
828
  
0c424e03   Steven   syntax fixes. WIP
829
830
      setConnectionStatus( ConnectionStatus::Connected );
      if( m_connectPromise )
51becbde   Peter M. Groen   Committed the ent...
831
      {
ca0cf29e   Steven   syntax fixes, con...
832
          LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
51becbde   Peter M. Groen   Committed the ent...
833
834
835
836
837
          m_connectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
838
  void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
839
  {
0c424e03   Steven   syntax fixes. WIP
840
841
      (void) response;
      LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
51becbde   Peter M. Groen   Committed the ent...
842
843
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
51becbde   Peter M. Groen   Committed the ent...
844
845
846
847
          // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
ca0cf29e   Steven   syntax fixes, con...
848
      if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
849
      {
ca0cf29e   Steven   syntax fixes, con...
850
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
851
852
853
854
855
856
857
858
859
      }
      m_operationsCompleteCV.notify_all();
  }
  
  //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
  //{
  //    MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
  //}
  
ca0cf29e   Steven   syntax fixes, con...
860
  void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
51becbde   Peter M. Groen   Committed the ent...
861
  {
ca0cf29e   Steven   syntax fixes, con...
862
      LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
51becbde   Peter M. Groen   Committed the ent...
863
864
865
866
867
868
869
870
871
872
873
874
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_subscriptions.clear();
          m_pendingSubscriptions.clear();
          m_subscribeTokenToTopic.clear();
          m_unsubscribeTokenToTopic.clear();
  
          // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
          m_operationResult[-200] = true;
          m_pendingOperations.clear();
      }
  
0c424e03   Steven   syntax fixes. WIP
875
      setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
876
  
0c424e03   Steven   syntax fixes. WIP
877
878
      if( m_disconnectPromise )
      {
51becbde   Peter M. Groen   Committed the ent...
879
880
881
882
883
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
884
  void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
885
  {
0c424e03   Steven   syntax fixes. WIP
886
      (void) response;
ca0cf29e   Steven   syntax fixes, con...
887
      LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
51becbde   Peter M. Groen   Committed the ent...
888
      {
ca0cf29e   Steven   syntax fixes, con...
889
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);        
51becbde   Peter M. Groen   Committed the ent...
890
891
892
893
894
          // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
          m_operationResult[-200] = false;
          m_pendingOperations.erase(-200);
      }
  
0c424e03   Steven   syntax fixes. WIP
895
      if( MQTTAsync_isConnected( m_client ) )
51becbde   Peter M. Groen   Committed the ent...
896
      {
0c424e03   Steven   syntax fixes. WIP
897
          setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
898
899
900
      }
      else
      {
0c424e03   Steven   syntax fixes. WIP
901
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
902
903
      }
  
0c424e03   Steven   syntax fixes. WIP
904
      if( m_disconnectPromise )
51becbde   Peter M. Groen   Committed the ent...
905
906
907
908
909
910
      {
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
911
  void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
912
913
  {
      auto pd = response.publishData();
d557d523   Peter M. Groen   Fix deferred subs...
914
      // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
915
916
917
918
919
920
921
922
923
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = true;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
924
  void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
925
  {
ca0cf29e   Steven   syntax fixes, con...
926
      LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
927
928
929
930
931
932
933
934
935
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
936
  void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
937
  {
ca0cf29e   Steven   syntax fixes, con...
938
939
940
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
51becbde   Peter M. Groen   Committed the ent...
941
942
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
943
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
944
945
946
      {
          // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
947
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
948
      });
ca0cf29e   Steven   syntax fixes, con...
949
950
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
0c424e03   Steven   syntax fixes. WIP
951
      {
ca0cf29e   Steven   syntax fixes, con...
952
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
953
954
955
956
957
          return;
      }
      auto topic = it->second;
      m_subscribeTokenToTopic.erase(it);
  
ca0cf29e   Steven   syntax fixes, con...
958
959
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
960
      {
ca0cf29e   Steven   syntax fixes, con...
961
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
962
963
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
964
      if( response.qos() != pendingIt->second.qos )
51becbde   Peter M. Groen   Committed the ent...
965
      {
ca0cf29e   Steven   syntax fixes, con...
966
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
51becbde   Peter M. Groen   Committed the ent...
967
      }
ca0cf29e   Steven   syntax fixes, con...
968
969
970
971
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
      m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
972
973
974
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
975
  void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
976
  {
ca0cf29e   Steven   syntax fixes, con...
977
978
979
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
980
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
981
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
982
983
984
      {
          // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
985
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
986
987
      });
  
ca0cf29e   Steven   syntax fixes, con...
988
989
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
990
      {
ca0cf29e   Steven   syntax fixes, con...
991
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
992
993
994
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
995
      m_subscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
996
  
ca0cf29e   Steven   syntax fixes, con...
997
998
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
999
      {
ca0cf29e   Steven   syntax fixes, con...
1000
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1001
1002
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
1003
1004
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
1005
1006
  }
  
ca0cf29e   Steven   syntax fixes, con...
1007
  void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
1008
  {
ca0cf29e   Steven   syntax fixes, con...
1009
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
51becbde   Peter M. Groen   Committed the ent...
1010
  
ca0cf29e   Steven   syntax fixes, con...
1011
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
1012
1013
1014
1015
1016
1017
1018
1019
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
      // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
      // As a workaround the last unsubscribe token is stored and is used when no valid token is available.
      // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
      // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
      // sequentially (see ClientPaho::unsubscribe)!
      auto token = response.token();
ca0cf29e   Steven   syntax fixes, con...
1020
      if( -1 == token )
51becbde   Peter M. Groen   Committed the ent...
1021
1022
1023
1024
1025
1026
      {
          token = m_lastUnsubscribe;
          m_lastUnsubscribe = -1;
      }
  
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
1027
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
1028
1029
1030
      {
          // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
          m_operationResult[token] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
1031
          m_pendingOperations.erase( token );
51becbde   Peter M. Groen   Committed the ent...
1032
1033
      });
  
ca0cf29e   Steven   syntax fixes, con...
1034
1035
      auto it = m_unsubscribeTokenToTopic.find( token );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1036
      {
ca0cf29e   Steven   syntax fixes, con...
1037
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
51becbde   Peter M. Groen   Committed the ent...
1038
1039
1040
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1041
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1042
  
ca0cf29e   Steven   syntax fixes, con...
1043
1044
1045
1046
      auto registeredIt = m_subscriptions.find( topic );
      if( m_subscriptions.end() == registeredIt )
      {
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1047
1048
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
1049
1050
1051
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
      m_subscriptions.erase( registeredIt );
51becbde   Peter M. Groen   Committed the ent...
1052
1053
1054
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1055
  void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
1056
  {
ca0cf29e   Steven   syntax fixes, con...
1057
1058
      LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
1059
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1060
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
1061
1062
1063
      {
          // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
1064
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
1065
1066
      });
  
ca0cf29e   Steven   syntax fixes, con...
1067
1068
      auto it = m_unsubscribeTokenToTopic.find( response.token() );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1069
      {
ca0cf29e   Steven   syntax fixes, con...
1070
          LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1071
1072
1073
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1074
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1075
1076
  }
  
ca0cf29e   Steven   syntax fixes, con...
1077
  int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
51becbde   Peter M. Groen   Committed the ent...
1078
  {
ca0cf29e   Steven   syntax fixes, con...
1079
      LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1080
  
ca0cf29e   Steven   syntax fixes, con...
1081
      std::function<void( MqttMessage )> cb;
51becbde   Peter M. Groen   Committed the ent...
1082
1083
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1084
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
1085
          {
ca0cf29e   Steven   syntax fixes, con...
1086
              if( boost::regex_match( message.topic(), s.second.topicRegex ) )
51becbde   Peter M. Groen   Committed the ent...
1087
1088
1089
1090
1091
1092
              {
                  cb = s.second.callback;
              }
          }
      }
  
ca0cf29e   Steven   syntax fixes, con...
1093
      if( cb )
51becbde   Peter M. Groen   Committed the ent...
1094
      {
ca0cf29e   Steven   syntax fixes, con...
1095
          cb( message );
51becbde   Peter M. Groen   Committed the ent...
1096
1097
1098
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
1099
          LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
1100
1101
1102
1103
      }
      return 1;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1104
  void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1105
  {
ca0cf29e   Steven   syntax fixes, con...
1106
1107
      LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
      if( m_deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
1108
      {
ca0cf29e   Steven   syntax fixes, con...
1109
          m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
51becbde   Peter M. Groen   Committed the ent...
1110
1111
1112
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1113
  void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
1114
  {
ca0cf29e   Steven   syntax fixes, con...
1115
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
1116
      // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
ca0cf29e   Steven   syntax fixes, con...
1117
      setConnectionStatus( ConnectionStatus::ReconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
1118
1119
1120
  
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      // Remove all tokens related to subscriptions from the active operations.
ca0cf29e   Steven   syntax fixes, con...
1121
      for( const auto& p : m_subscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1122
1123
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1124
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1125
1126
      }
  
ca0cf29e   Steven   syntax fixes, con...
1127
      for( const auto& p : m_unsubscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1128
1129
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1130
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1131
1132
1133
1134
1135
1136
1137
      }
      // Clear the administration used in the subscribe process.
      m_subscribeTokenToTopic.clear();
      m_unsubscribeTokenToTopic.clear();
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1138
  void ClientPaho::onFirstConnect( void* context, char* cause )
9421324b   Peter M. Groen   First fix on conn...
1139
1140
  {
      LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1141
      if( context )
9421324b   Peter M. Groen   First fix on conn...
1142
      {
ca0cf29e   Steven   syntax fixes, con...
1143
1144
          auto *cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "Unknown cause" : cause );
9802eeb9   Steven   statement changes
1145
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
9421324b   Peter M. Groen   First fix on conn...
1146
1147
1148
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1149
  void ClientPaho::onConnect( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1150
  {
9421324b   Peter M. Groen   First fix on conn...
1151
      LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1152
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1153
      {
ca0cf29e   Steven   syntax fixes, con...
1154
1155
1156
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "unknown cause" : cause );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
51becbde   Peter M. Groen   Committed the ent...
1157
1158
1159
1160
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1161
  void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1162
  {
2c0c99a5   Peter M. Groen   Fix connect Callb...
1163
      LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1164
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1165
      {
ca0cf29e   Steven   syntax fixes, con...
1166
1167
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
76d01373   Peter M. Groen   Fix on connection
1168
          {
51becbde   Peter M. Groen   Committed the ent...
1169
              // connect should always have a valid response struct.
ca0cf29e   Steven   syntax fixes, con...
1170
              LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
2c0c99a5   Peter M. Groen   Fix connect Callb...
1171
              return;
51becbde   Peter M. Groen   Committed the ent...
1172
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
1173
          // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
ca0cf29e   Steven   syntax fixes, con...
1174
          cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
51becbde   Peter M. Groen   Committed the ent...
1175
1176
1177
1178
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1179
  void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1180
  {
ca0cf29e   Steven   syntax fixes, con...
1181
1182
      LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1183
      {
ca0cf29e   Steven   syntax fixes, con...
1184
1185
1186
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
      }
  }
  
  //// static
  //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
  //{
  //    apply_unused_parameters(properties);
  //    try {
  //        if (context) {
  //            auto* cl = reinterpret_cast<ClientPaho*>(context);
  //            cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
  //        }
  //    }
  //    catch (...) {
  //    }
  //    catch (const std::exception& e) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
  //    }
  //    catch (...) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
  //    }
  //}
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1211
  void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1212
  {
ca0cf29e   Steven   syntax fixes, con...
1213
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1214
      {
ca0cf29e   Steven   syntax fixes, con...
1215
1216
1217
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : 0 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1218
1219
1220
1221
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1222
  void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1223
  {
9421324b   Peter M. Groen   First fix on conn...
1224
      LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1225
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1226
      {
ca0cf29e   Steven   syntax fixes, con...
1227
1228
1229
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1230
1231
1232
1233
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1234
  void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1235
  {
ca0cf29e   Steven   syntax fixes, con...
1236
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1237
      {
ca0cf29e   Steven   syntax fixes, con...
1238
1239
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
          {
              // publish should always have a valid response struct.
              // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
          }
          MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
          cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1250
  void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1251
  {
ca0cf29e   Steven   syntax fixes, con...
1252
1253
      (void) response;
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1254
      {
ca0cf29e   Steven   syntax fixes, con...
1255
1256
1257
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1258
1259
1260
1261
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1262
  void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1263
  {
ca0cf29e   Steven   syntax fixes, con...
1264
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1265
      {
ca0cf29e   Steven   syntax fixes, con...
1266
1267
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1268
1269
1270
1271
          {
              // subscribe should always have a valid response struct.
              // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
          }
ca0cf29e   Steven   syntax fixes, con...
1272
1273
          MqttSuccess resp( response->token, response->alt.qos );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1274
1275
1276
1277
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1278
  void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1279
  {
ca0cf29e   Steven   syntax fixes, con...
1280
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1281
      {
ca0cf29e   Steven   syntax fixes, con...
1282
1283
1284
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1285
1286
1287
1288
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1289
  void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1290
  {
ca0cf29e   Steven   syntax fixes, con...
1291
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1292
      {
ca0cf29e   Steven   syntax fixes, con...
1293
1294
1295
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : -1 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1296
1297
1298
1299
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1300
  void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1301
  {
ca0cf29e   Steven   syntax fixes, con...
1302
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1303
      {
ca0cf29e   Steven   syntax fixes, con...
1304
1305
1306
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1307
1308
1309
1310
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1311
  int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
51becbde   Peter M. Groen   Committed the ent...
1312
1313
  {
  
ca0cf29e   Steven   syntax fixes, con...
1314
      OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
51becbde   Peter M. Groen   Committed the ent...
1315
      {
ca0cf29e   Steven   syntax fixes, con...
1316
1317
          MQTTAsync_freeMessage( &message );
          MQTTAsync_free( topicName );
51becbde   Peter M. Groen   Committed the ent...
1318
1319
      });
  
ca0cf29e   Steven   syntax fixes, con...
1320
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1321
      {
ca0cf29e   Steven   syntax fixes, con...
1322
1323
1324
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttMessage msg( topicName, *message );
          cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1325
1326
1327
1328
1329
1330
      }
  
      return 1; // always return true. Otherwise this callback is triggered again.
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1331
  void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1332
  {
ca0cf29e   Steven   syntax fixes, con...
1333
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1334
      {
ca0cf29e   Steven   syntax fixes, con...
1335
1336
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
51becbde   Peter M. Groen   Committed the ent...
1337
1338
1339
1340
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1341
  void ClientPaho::onConnectionLost( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1342
  {
ca0cf29e   Steven   syntax fixes, con...
1343
      OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
51becbde   Peter M. Groen   Committed the ent...
1344
      {
ca0cf29e   Steven   syntax fixes, con...
1345
          if( cause )
51becbde   Peter M. Groen   Committed the ent...
1346
          {
ca0cf29e   Steven   syntax fixes, con...
1347
              MQTTAsync_free( cause );
51becbde   Peter M. Groen   Committed the ent...
1348
1349
1350
          }
      });
  
ca0cf29e   Steven   syntax fixes, con...
1351
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1352
      {
ca0cf29e   Steven   syntax fixes, con...
1353
1354
1355
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string msg( nullptr == cause ? "cause unknown" : cause );
          cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1356
1357
1358
1359
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1360
  void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
51becbde   Peter M. Groen   Committed the ent...
1361
  {
ca0cf29e   Steven   syntax fixes, con...
1362
1363
      (void) message;
      switch( level )
51becbde   Peter M. Groen   Committed the ent...
1364
1365
1366
      {
          case MQTTASYNC_TRACE_MAXIMUM:
          case MQTTASYNC_TRACE_MEDIUM:
ca0cf29e   Steven   syntax fixes, con...
1367
1368
          case MQTTASYNC_TRACE_MINIMUM:
          {
51becbde   Peter M. Groen   Committed the ent...
1369
1370
1371
              // ("ClientPaho", "paho - %1", message)
              break;
          }
ca0cf29e   Steven   syntax fixes, con...
1372
1373
          case MQTTASYNC_TRACE_PROTOCOL:
          {
51becbde   Peter M. Groen   Committed the ent...
1374
1375
1376
1377
1378
              // ("ClientPaho", "paho - %1", message)
              break;
          }
          case MQTTASYNC_TRACE_ERROR:
          case MQTTASYNC_TRACE_SEVERE:
ca0cf29e   Steven   syntax fixes, con...
1379
1380
          case MQTTASYNC_TRACE_FATAL:
          {
51becbde   Peter M. Groen   Committed the ent...
1381
1382
1383
1384
1385
              // ("ClientPaho", "paho - %1", message)
              break;
          }
      }
  }