Blame view

src/clientpaho.cpp 49.4 KB
b5d9e433   Peter M. Groen   Fixed License Hea...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  /* ****************************************************************************
   * Copyright 2019 Open Systems Development BV                                 *
   *                                                                            *
   * Permission is hereby granted, free of charge, to any person obtaining a    *
   * copy of this software and associated documentation files (the "Software"), *
   * to deal in the Software without restriction, including without limitation  *
   * the rights to use, copy, modify, merge, publish, distribute, sublicense,   *
   * and/or sell copies of the Software, and to permit persons to whom the      *
   * Software is furnished to do so, subject to the following conditions:       *
   *                                                                            *
   * The above copyright notice and this permission notice shall be included in *
   * all copies or substantial portions of the Software.                        *
   *                                                                            *
   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,   *
   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL    *
   * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING    *
   * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER        *
   * DEALINGS IN THE SOFTWARE.                                                  *
   * ***************************************************************************/
51becbde   Peter M. Groen   Committed the ent...
22
23
24
25
26
  #include "clientpaho.h"
  
  #include "errorcode.h"
  #include "mqttutil.h"
  #include "lockguard.h"
9421324b   Peter M. Groen   First fix on conn...
27
  #include "log.h"
51becbde   Peter M. Groen   Committed the ent...
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
  #include "metaprogrammingdefs.h"
  #include "mqttstream.h"
  #include "scopeguard.h"
  #include "uriparser.h"
  
  // std::chrono
  #include "compat-chrono.h"
  
  // std
  #include <algorithm>
  #include <iterator>
  
  using namespace osdev::components::mqtt;
  
  namespace {
  
  #if defined(__clang__)
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wunused-template"
  #endif
  
  OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)
  
  template <typename TRet>
  inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
      return MQTTAsync_disconnectOptions_initializer;
  }
  
  template <typename TRet>
  inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
  {
  // For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
  // This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
  // method should not be chosen when the struct does not contain member onSuccess5!
  // On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
  // while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
  // We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
  // additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
  #ifndef MQTTAsync_disconnectOptions_initializer5
  #pragma GCC diagnostic push
  #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
      TRet ret = MQTTAsync_disconnectOptions_initializer;
      ret.struct_version = 1;
      ret.onSuccess5 = nullptr;
      ret.onFailure5 = nullptr;
      return ret;
  #pragma GCC diagnostic pop
  #else
      return MQTTAsync_disconnectOptions_initializer5;
  #endif
  }
  
  template <typename TRet>
  struct Init
  {
      static TRet initialize()
      {
          return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
      }
  };
  #if defined(__clang__)
  #pragma GCC diagnostic pop
  #endif
  
  } // namespace
  
  std::atomic_int ClientPaho::s_numberOfInstances(0);
  
ca0cf29e   Steven   syntax fixes, con...
97
  ClientPaho::ClientPaho( const std::string& _endpoint,
51becbde   Peter M. Groen   Committed the ent...
98
      const std::string& _id,
ca0cf29e   Steven   syntax fixes, con...
99
100
      const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
      const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
      : m_mutex()
      , m_endpoint()
      , m_username()
      , m_password()
      , m_clientId(_id)
      , m_pendingOperations()
      , m_operationResult()
      , m_operationsCompleteCV()
      , m_subscriptions()
      , m_pendingSubscriptions()
      , m_subscribeTokenToTopic()
      , m_unsubscribeTokenToTopic()
      , m_pendingPublishes()
      , m_processPendingPublishes(false)
      , m_pendingPublishesReadyCV()
      , m_client()
      , m_connectionStatus(ConnectionStatus::Disconnected)
      , m_connectionStatusCallback(connectionStatusCallback)
      , m_deliveryCompleteCallback(deliveryCompleteCallback)
      , m_lastUnsubscribe(-1)
      , m_connectPromise()
      , m_disconnectPromise()
      , m_callbackEventQueue(m_clientId)
      , m_workerThread()
  {
ca0cf29e   Steven   syntax fixes, con...
126
      if( 0 == s_numberOfInstances++ )
76d01373   Peter M. Groen   Fix on connection
127
      {
ca0cf29e   Steven   syntax fixes, con...
128
          MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
51becbde   Peter M. Groen   Committed the ent...
129
      }
76d01373   Peter M. Groen   Fix on connection
130
  
ca0cf29e   Steven   syntax fixes, con...
131
      LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
51becbde   Peter M. Groen   Committed the ent...
132
      parseEndpoint(_endpoint);
76d01373   Peter M. Groen   Fix on connection
133
  
ca0cf29e   Steven   syntax fixes, con...
134
135
      auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
      if( MQTTASYNC_SUCCESS == rc )
51becbde   Peter M. Groen   Committed the ent...
136
      {
ca0cf29e   Steven   syntax fixes, con...
137
138
          MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
          m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
51becbde   Peter M. Groen   Committed the ent...
139
140
141
      }
      else
      {
76d01373   Peter M. Groen   Fix on connection
142
          LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
143
144
145
146
147
      }
  }
  
  ClientPaho::~ClientPaho()
  {
ca0cf29e   Steven   syntax fixes, con...
148
      LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
51becbde   Peter M. Groen   Committed the ent...
149
150
151
152
      if( MQTTAsync_isConnected( m_client ) )
      {
          this->unsubscribeAll();
  
ca0cf29e   Steven   syntax fixes, con...
153
154
          this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
          this->disconnect( true, 5000 );
51becbde   Peter M. Groen   Committed the ent...
155
156
157
158
      }
      else
      {
          // If the status was already disconnected this call does nothing
ca0cf29e   Steven   syntax fixes, con...
159
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
160
161
      }
  
ca0cf29e   Steven   syntax fixes, con...
162
      if( 0 == --s_numberOfInstances )
51becbde   Peter M. Groen   Committed the ent...
163
164
165
166
167
      {
          // encountered a case where termination of the logging system within paho led to a segfault.
          // This was a paho thread that was cleaned while at the same time the logging system was terminated.
          // Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
          // frequently.
ca0cf29e   Steven   syntax fixes, con...
168
          MQTTAsync_setTraceCallback( nullptr );
51becbde   Peter M. Groen   Committed the ent...
169
170
      }
  
ca0cf29e   Steven   syntax fixes, con...
171
      MQTTAsync_destroy( &m_client );
51becbde   Peter M. Groen   Committed the ent...
172
173
  
      m_callbackEventQueue.stop();
ca0cf29e   Steven   syntax fixes, con...
174
      if( m_workerThread.joinable() )
51becbde   Peter M. Groen   Committed the ent...
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
      {
          m_workerThread.join();
      }
  }
  
  std::string ClientPaho::clientId() const
  {
      return m_clientId;
  }
  
  ConnectionStatus ClientPaho::connectionStatus() const
  {
      return m_connectionStatus;
  }
  
31eece9b   Steven   added LWT ( last ...
190
  std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
51becbde   Peter M. Groen   Committed the ent...
191
192
193
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
2b967ea7   Steven   promise is not wo...
194
          if( ConnectionStatus::Disconnected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
195
196
197
          {
              return -1;
          }
2b967ea7   Steven   promise is not wo...
198
          setConnectionStatus( ConnectionStatus::ConnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
199
200
      }
  
76d01373   Peter M. Groen   Fix on connection
201
202
      LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
  
51becbde   Peter M. Groen   Committed the ent...
203
      MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
9421324b   Peter M. Groen   First fix on conn...
204
      conn_opts.keepAliveInterval = 5;
51becbde   Peter M. Groen   Committed the ent...
205
      conn_opts.cleansession = 1;
2c0c99a5   Peter M. Groen   Fix connect Callb...
206
      conn_opts.onSuccess = nullptr;
51becbde   Peter M. Groen   Committed the ent...
207
208
209
      conn_opts.onFailure = &ClientPaho::onConnectFailure;
      conn_opts.context = this;
      conn_opts.automaticReconnect = 1;
31eece9b   Steven   added LWT ( last ...
210
  
76d01373   Peter M. Groen   Fix on connection
211
212
213
214
      // Make sure we get a signal if the promise is fulfilled
      auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
      if( MQTTASYNC_SUCCESS == ccb )
      {
ca0cf29e   Steven   syntax fixes, con...
215
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
76d01373   Peter M. Groen   Fix on connection
216
217
218
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
219
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
76d01373   Peter M. Groen   Fix on connection
220
221
222
      }
  
      // Setup the last will and testament, if so desired.
31eece9b   Steven   added LWT ( last ...
223
224
225
226
227
228
229
      if( !lwt.topic().empty() )
      {
          MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
          will_opts.message = lwt.message().c_str();
          will_opts.topicName = lwt.topic().c_str();
  
          conn_opts.will = &will_opts;
76d01373   Peter M. Groen   Fix on connection
230
231
  
          LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
31eece9b   Steven   added LWT ( last ...
232
233
234
235
236
237
      }
      else
      {
          conn_opts.will = nullptr;
      }
  
2b967ea7   Steven   promise is not wo...
238
      if( !m_username.empty() )
51becbde   Peter M. Groen   Committed the ent...
239
240
241
242
      {
          conn_opts.username = m_username.c_str();
      }
  
2b967ea7   Steven   promise is not wo...
243
      if( !m_password.empty() )
51becbde   Peter M. Groen   Committed the ent...
244
245
246
247
248
249
250
      {
          conn_opts.password = m_password.c_str();
      }
  
      std::promise<void> waitForConnectPromise{};
      auto waitForConnect = waitForConnectPromise.get_future();
      m_connectPromise.reset();
2b967ea7   Steven   promise is not wo...
251
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
252
      {
2b967ea7   Steven   promise is not wo...
253
          m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
51becbde   Peter M. Groen   Committed the ent...
254
255
256
      }
  
      {
2b967ea7   Steven   promise is not wo...
257
258
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
          if( !m_pendingOperations.insert( -100 ).second )
51becbde   Peter M. Groen   Committed the ent...
259
260
261
          {
              // Write something
          }
2b967ea7   Steven   promise is not wo...
262
          m_operationResult.erase( -100 );
51becbde   Peter M. Groen   Committed the ent...
263
264
      }
  
2b967ea7   Steven   promise is not wo...
265
266
      int rc = MQTTAsync_connect( m_client, &conn_opts );
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
267
      {
2b967ea7   Steven   promise is not wo...
268
269
          setConnectionStatus( ConnectionStatus::Disconnected );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
270
271
272
273
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
  
2b967ea7   Steven   promise is not wo...
274
      if( wait )
51becbde   Peter M. Groen   Committed the ent...
275
276
277
278
279
280
281
      {
          waitForConnect.get();
          m_connectPromise.reset();
      }
      return -100;
  }
  
0c424e03   Steven   syntax fixes. WIP
282
  std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
51becbde   Peter M. Groen   Committed the ent...
283
284
285
286
287
  {
      ConnectionStatus currentStatus = m_connectionStatus;
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
288
289
          if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
          {
51becbde   Peter M. Groen   Committed the ent...
290
291
292
293
              return -1;
          }
  
          currentStatus = m_connectionStatus;
0c424e03   Steven   syntax fixes. WIP
294
          setConnectionStatus( ConnectionStatus::DisconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
295
296
297
298
299
300
301
302
303
304
305
      }
  
      MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
      disconn_opts.timeout = timeoutMs;
      disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
      disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
      disconn_opts.context = this;
  
      std::promise<void> waitForDisconnectPromise{};
      auto waitForDisconnect = waitForDisconnectPromise.get_future();
      m_disconnectPromise.reset();
0c424e03   Steven   syntax fixes. WIP
306
307
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
308
309
310
311
312
          m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
313
          if( !m_pendingOperations.insert( -200 ).second )
51becbde   Peter M. Groen   Committed the ent...
314
          {
ca0cf29e   Steven   syntax fixes, con...
315
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
316
          }
ca0cf29e   Steven   syntax fixes, con...
317
          m_operationResult.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
318
319
      }
  
ca0cf29e   Steven   syntax fixes, con...
320
      int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
0c424e03   Steven   syntax fixes. WIP
321
      if( MQTTASYNC_SUCCESS != rc )
76d01373   Peter M. Groen   Fix on connection
322
      {
0c424e03   Steven   syntax fixes. WIP
323
          if( MQTTASYNC_DISCONNECTED == rc )
76d01373   Peter M. Groen   Fix on connection
324
          {
51becbde   Peter M. Groen   Committed the ent...
325
326
327
              currentStatus = ConnectionStatus::Disconnected;
          }
  
0c424e03   Steven   syntax fixes. WIP
328
329
          setConnectionStatus( currentStatus );
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
330
          m_operationResult[-200] = false;
ca0cf29e   Steven   syntax fixes, con...
331
          m_pendingOperations.erase( -200 );
51becbde   Peter M. Groen   Committed the ent...
332
  
0c424e03   Steven   syntax fixes. WIP
333
          if( MQTTASYNC_DISCONNECTED == rc )
2b967ea7   Steven   promise is not wo...
334
          {
51becbde   Peter M. Groen   Committed the ent...
335
336
              return -1;
          }
ca0cf29e   Steven   syntax fixes, con...
337
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
338
339
      }
  
2b967ea7   Steven   promise is not wo...
340
341
      if( wait )
      {
51becbde   Peter M. Groen   Committed the ent...
342
343
          if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
          {
ca0cf29e   Steven   syntax fixes, con...
344
              LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
51becbde   Peter M. Groen   Committed the ent...
345
346
347
348
349
350
351
          }
          waitForDisconnect.get();
          m_disconnectPromise.reset();
      }
      return -200;
  }
  
ca0cf29e   Steven   syntax fixes, con...
352
  std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
353
  {
0c424e03   Steven   syntax fixes. WIP
354
      if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
355
      {
ca0cf29e   Steven   syntax fixes, con...
356
          LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
357
358
          return -1;
      }
0c424e03   Steven   syntax fixes. WIP
359
      else if( ConnectionStatus::Disconnected == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
360
      {
ca0cf29e   Steven   syntax fixes, con...
361
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
2b967ea7   Steven   promise is not wo...
362
          connect( true );
51becbde   Peter M. Groen   Committed the ent...
363
364
      }
  
0c424e03   Steven   syntax fixes. WIP
365
      if( !isValidTopic(message.topic() ) )
51becbde   Peter M. Groen   Committed the ent...
366
      {
ca0cf29e   Steven   syntax fixes, con...
367
          LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
51becbde   Peter M. Groen   Committed the ent...
368
369
      }
  
0c424e03   Steven   syntax fixes. WIP
370
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
371
372
373
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
374
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
375
376
377
378
      {
          qos = 0;
      }
  
51becbde   Peter M. Groen   Committed the ent...
379
      std::unique_lock<std::mutex> lck(m_mutex);
9802eeb9   Steven   statement changes
380
      if( ConnectionStatus::Connected != m_connectionStatus || m_processPendingPublishes )
a670240b   Peter M. Groen   Fix on connection
381
      {
51becbde   Peter M. Groen   Committed the ent...
382
          m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
9802eeb9   Steven   statement changes
383
          if( ConnectionStatus::Connected != m_connectionStatus )
a670240b   Peter M. Groen   Fix on connection
384
          {
0c424e03   Steven   syntax fixes. WIP
385
              LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
ca0cf29e   Steven   syntax fixes, con...
386
              m_pendingPublishes.push_front( Publish{ qos, message } );
51becbde   Peter M. Groen   Committed the ent...
387
388
389
390
              return -1;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
391
      return publishInternal( message, qos );
51becbde   Peter M. Groen   Committed the ent...
392
393
394
395
396
397
  }
  
  void ClientPaho::publishPending()
  {
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
0c424e03   Steven   syntax fixes. WIP
398
          if( !m_processPendingPublishes )
2b967ea7   Steven   promise is not wo...
399
          {
51becbde   Peter M. Groen   Committed the ent...
400
401
402
403
              return;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
404
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
405
      {
a670240b   Peter M. Groen   Fix on connection
406
          LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
51becbde   Peter M. Groen   Committed the ent...
407
408
      }
  
0c424e03   Steven   syntax fixes. WIP
409
      while( !m_pendingPublishes.empty() )
51becbde   Peter M. Groen   Committed the ent...
410
411
      {
          const auto& pub = m_pendingPublishes.back();
ca0cf29e   Steven   syntax fixes, con...
412
          publishInternal( pub.data, pub.qos );
51becbde   Peter M. Groen   Committed the ent...
413
414
415
416
417
418
419
420
421
422
423
  
          m_pendingPublishes.pop_back();
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_processPendingPublishes = false;
      }
      m_pendingPublishesReadyCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
424
  std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
51becbde   Peter M. Groen   Committed the ent...
425
  {
0c424e03   Steven   syntax fixes. WIP
426
      if( ConnectionStatus::Connected != m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
427
      {
11fe0b09   Peter M. Groen   Rework for subscr...
428
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
51becbde   Peter M. Groen   Committed the ent...
429
430
      }
  
0c424e03   Steven   syntax fixes. WIP
431
      if( !isValidTopic( topic ) )
51becbde   Peter M. Groen   Committed the ent...
432
433
      {
          // ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
434
435
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
          return -1;
51becbde   Peter M. Groen   Committed the ent...
436
437
      }
  
0c424e03   Steven   syntax fixes. WIP
438
      if( qos > 2 )
51becbde   Peter M. Groen   Committed the ent...
439
440
441
      {
          qos = 2;
      }
0c424e03   Steven   syntax fixes. WIP
442
      else if( qos < 0 )
51becbde   Peter M. Groen   Committed the ent...
443
444
445
446
447
448
449
450
      {
          qos = 0;
      }
  
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
          auto itExisting = m_subscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
451
452
453
454
          if( m_subscriptions.end() != itExisting )
          {
              if( itExisting->second.qos == qos )
              {
51becbde   Peter M. Groen   Committed the ent...
455
456
457
458
459
460
                  return -1;
              }
              // (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
          }
  
          auto itPending = m_pendingSubscriptions.find(topic);
0c424e03   Steven   syntax fixes. WIP
461
462
463
464
465
466
467
          if( m_pendingSubscriptions.end() != itPending )
          {
              if( itPending->second.qos == qos )
              {
                  auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
                  if( m_subscribeTokenToTopic.end() != itToken )
                  {
51becbde   Peter M. Groen   Committed the ent...
468
469
                      return itToken->first;
                  }
0c424e03   Steven   syntax fixes. WIP
470
471
                  else
                  {
51becbde   Peter M. Groen   Committed the ent...
472
473
474
475
476
477
478
                      return -1;
                  }
              }
              // (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
          }
  
          std::string existingTopic{};
0c424e03   Steven   syntax fixes. WIP
479
          if( isOverlappingInternal( topic, existingTopic ) )
51becbde   Peter M. Groen   Committed the ent...
480
481
          {
              // (OverlappingTopicException, "overlapping topic", existingTopic, topic);
11fe0b09   Peter M. Groen   Rework for subscr...
482
              LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
483
484
          }
  
ca0cf29e   Steven   syntax fixes, con...
485
486
          LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
          m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
51becbde   Peter M. Groen   Committed the ent...
487
      }
0c424e03   Steven   syntax fixes. WIP
488
      return subscribeInternal( topic, qos );
51becbde   Peter M. Groen   Committed the ent...
489
490
491
492
  }
  
  void ClientPaho::resubscribe()
  {
ca0cf29e   Steven   syntax fixes, con...
493
      decltype( m_pendingSubscriptions ) pendingSubscriptions{};
51becbde   Peter M. Groen   Committed the ent...
494
495
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
496
          std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
497
498
      }
  
0c424e03   Steven   syntax fixes. WIP
499
      for( const auto& s : pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
500
      {
0c424e03   Steven   syntax fixes. WIP
501
          subscribeInternal( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
502
503
504
      }
  }
  
31eece9b   Steven   added LWT ( last ...
505
  std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
506
507
  {
      {
0c424e03   Steven   syntax fixes. WIP
508
          OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
51becbde   Peter M. Groen   Committed the ent...
509
          bool found = false;
0c424e03   Steven   syntax fixes. WIP
510
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
511
          {
0c424e03   Steven   syntax fixes. WIP
512
              if( topic == s.first && qos == s.second.qos )
51becbde   Peter M. Groen   Committed the ent...
513
514
515
516
517
              {
                  found = true;
                  break;
              }
          }
0c424e03   Steven   syntax fixes. WIP
518
          if( !found )
51becbde   Peter M. Groen   Committed the ent...
519
520
521
522
523
524
525
526
527
528
529
530
531
532
          {
              return -1;
          }
      }
  
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
      opts.onFailure = &ClientPaho::onUnsubscribeFailure;
      opts.context = this;
  
      {
          // Need to lock the mutex because it is possible that the callback is faster than
          // the insertion of the token into the pending operations.
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
533
          auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
0c424e03   Steven   syntax fixes. WIP
534
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
535
          {
ca0cf29e   Steven   syntax fixes, con...
536
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
537
538
          }
  
0c424e03   Steven   syntax fixes. WIP
539
          if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
540
          {
ca0cf29e   Steven   syntax fixes, con...
541
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token )  + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
542
543
          }
  
0c424e03   Steven   syntax fixes. WIP
544
545
          m_operationResult.erase( opts.token );
          if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
546
          {
ca0cf29e   Steven   syntax fixes, con...
547
              LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic  " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
548
549
550
551
552
553
554
555
556
557
558
559
560
          }
          m_lastUnsubscribe = opts.token; // centos7 workaround
          m_unsubscribeTokenToTopic[opts.token] = topic;
      }
  
      // Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
      this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });
  
      return opts.token;
  }
  
  void ClientPaho::unsubscribeAll()
  {
0c424e03   Steven   syntax fixes. WIP
561
      decltype( m_subscriptions ) subscriptions{};
51becbde   Peter M. Groen   Committed the ent...
562
563
564
565
566
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          subscriptions = m_subscriptions;
      }
  
0c424e03   Steven   syntax fixes. WIP
567
568
      for( const auto& s : subscriptions )
      {
ca0cf29e   Steven   syntax fixes, con...
569
          this->unsubscribe( s.first, s.second.qos );
51becbde   Peter M. Groen   Committed the ent...
570
571
572
573
574
      }
  }
  
  std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
  {
ca0cf29e   Steven   syntax fixes, con...
575
576
577
      if( waitFor <= std::chrono::milliseconds( 0 ) )
      {
          return std::chrono::milliseconds( 0 );
51becbde   Peter M. Groen   Committed the ent...
578
579
580
      }
      std::chrono::milliseconds timeElapsed{};
      {
ca0cf29e   Steven   syntax fixes, con...
581
          osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
51becbde   Peter M. Groen   Committed the ent...
582
          {
ca0cf29e   Steven   syntax fixes, con...
583
              timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
51becbde   Peter M. Groen   Committed the ent...
584
585
          });
          std::unique_lock<std::mutex> lck(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
586
  
51becbde   Peter M. Groen   Committed the ent...
587
588
589
          // ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
          m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
          {
ca0cf29e   Steven   syntax fixes, con...
590
              if( tokens.empty() )
51becbde   Peter M. Groen   Committed the ent...
591
592
593
              { // wait for all operations to end
                  return m_pendingOperations.empty();
              }
ca0cf29e   Steven   syntax fixes, con...
594
              else if( tokens.size() == 1 )
51becbde   Peter M. Groen   Committed the ent...
595
              {
ca0cf29e   Steven   syntax fixes, con...
596
                  return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
51becbde   Peter M. Groen   Committed the ent...
597
598
599
600
601
602
603
604
605
              }
              std::vector<std::int32_t> intersect{};
              std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
              return intersect.empty();
          } );
      }
      return timeElapsed;
  }
  
ca0cf29e   Steven   syntax fixes, con...
606
  bool ClientPaho::isOverlapping( const std::string& topic ) const
51becbde   Peter M. Groen   Committed the ent...
607
608
  {
      std::string existingTopic{};
ca0cf29e   Steven   syntax fixes, con...
609
      return isOverlapping( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
610
611
  }
  
ca0cf29e   Steven   syntax fixes, con...
612
  bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
613
614
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
615
      return isOverlappingInternal( topic, existingTopic );
51becbde   Peter M. Groen   Committed the ent...
616
617
618
619
620
621
622
  }
  
  std::vector<std::int32_t> ClientPaho::pendingOperations() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      std::vector<std::int32_t> retval{};
      retval.resize(m_pendingOperations.size());
ca0cf29e   Steven   syntax fixes, con...
623
      std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
51becbde   Peter M. Groen   Committed the ent...
624
625
626
627
628
629
630
631
632
      return retval;
  }
  
  bool ClientPaho::hasPendingSubscriptions() const
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      return !m_pendingSubscriptions.empty();
  }
  
ca0cf29e   Steven   syntax fixes, con...
633
  boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
51becbde   Peter M. Groen   Committed the ent...
634
635
636
  {
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      boost::optional<bool> ret{};
ca0cf29e   Steven   syntax fixes, con...
637
638
      auto cit = m_operationResult.find( token );
      if( m_operationResult.end() != cit )
51becbde   Peter M. Groen   Committed the ent...
639
640
641
642
643
644
      {
          ret = cit->second;
      }
      return ret;
  }
  
ca0cf29e   Steven   syntax fixes, con...
645
  void ClientPaho::parseEndpoint( const std::string& _endpoint )
51becbde   Peter M. Groen   Committed the ent...
646
  {
ca0cf29e   Steven   syntax fixes, con...
647
648
      auto ep = UriParser::parse( _endpoint );
      if( ep.find( "user" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
649
650
651
652
653
      {
          m_username = ep["user"];
          ep["user"].clear();
      }
  
ca0cf29e   Steven   syntax fixes, con...
654
      if( ep.find( "password" ) != ep.end() )
51becbde   Peter M. Groen   Committed the ent...
655
656
657
658
      {
          m_password = ep["password"];
          ep["password"].clear();
      }
ca0cf29e   Steven   syntax fixes, con...
659
      m_endpoint = UriParser::toString( ep );
51becbde   Peter M. Groen   Committed the ent...
660
661
  }
  
ca0cf29e   Steven   syntax fixes, con...
662
  std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
51becbde   Peter M. Groen   Committed the ent...
663
664
665
666
667
668
669
670
671
672
673
674
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onPublishSuccess;
      opts.onFailure = &ClientPaho::onPublishFailure;
      opts.context = this;
      auto msg = message.toAsyncMessage();
      msg.qos = qos;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
  
      // OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
675
      auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
ca0cf29e   Steven   syntax fixes, con...
676
      if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
677
      {
ca0cf29e   Steven   syntax fixes, con...
678
          LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
679
680
      }
  
ca0cf29e   Steven   syntax fixes, con...
681
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
682
      {
d557d523   Peter M. Groen   Fix deferred subs...
683
          // LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
684
      }
ca0cf29e   Steven   syntax fixes, con...
685
      m_operationResult.erase( opts.token );
51becbde   Peter M. Groen   Committed the ent...
686
687
688
      return opts.token;
  }
  
ca0cf29e   Steven   syntax fixes, con...
689
  std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
51becbde   Peter M. Groen   Committed the ent...
690
691
692
693
694
695
696
697
698
  {
      MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
      opts.onSuccess = &ClientPaho::onSubscribeSuccess;
      opts.onFailure = &ClientPaho::onSubscribeFailure;
      opts.context = this;
  
      // Need to lock the mutex because it is possible that the callback is faster than
      // the insertion of the token into the pending operations.
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
699
      auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
51becbde   Peter M. Groen   Committed the ent...
700
701
      if (MQTTASYNC_SUCCESS != rc)
      {
ca0cf29e   Steven   syntax fixes, con...
702
703
          m_pendingSubscriptions.erase( topic );
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
51becbde   Peter M. Groen   Committed the ent...
704
705
      }
  
ca0cf29e   Steven   syntax fixes, con...
706
      if( !m_pendingOperations.insert( opts.token ).second )
51becbde   Peter M. Groen   Committed the ent...
707
      {
ca0cf29e   Steven   syntax fixes, con...
708
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
51becbde   Peter M. Groen   Committed the ent...
709
      }
ca0cf29e   Steven   syntax fixes, con...
710
711
      m_operationResult.erase( opts.token );
      if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
51becbde   Peter M. Groen   Committed the ent...
712
      {
ca0cf29e   Steven   syntax fixes, con...
713
          LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
51becbde   Peter M. Groen   Committed the ent...
714
715
716
717
718
      }
      m_subscribeTokenToTopic[opts.token] = topic;
      return opts.token;
  }
  
0c424e03   Steven   syntax fixes. WIP
719
  void ClientPaho::setConnectionStatus( ConnectionStatus status )
51becbde   Peter M. Groen   Committed the ent...
720
  {
d557d523   Peter M. Groen   Fix deferred subs...
721
      LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) );
51becbde   Peter M. Groen   Committed the ent...
722
723
      ConnectionStatus curStatus = m_connectionStatus;
      m_connectionStatus = status;
0c424e03   Steven   syntax fixes. WIP
724
      if( status != curStatus && m_connectionStatusCallback )
51becbde   Peter M. Groen   Committed the ent...
725
      {
d557d523   Peter M. Groen   Fix deferred subs...
726
          LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) );
0c424e03   Steven   syntax fixes. WIP
727
          m_connectionStatusCallback( m_clientId, status );
51becbde   Peter M. Groen   Committed the ent...
728
729
730
      }
  }
  
0c424e03   Steven   syntax fixes. WIP
731
  bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
51becbde   Peter M. Groen   Committed the ent...
732
733
  {
      existingTopic.clear();
0c424e03   Steven   syntax fixes. WIP
734
      for( const auto& s : m_pendingSubscriptions )
51becbde   Peter M. Groen   Committed the ent...
735
      {
0c424e03   Steven   syntax fixes. WIP
736
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
737
738
739
740
741
742
          {
              existingTopic = s.first;
              return true;
          }
      }
  
0c424e03   Steven   syntax fixes. WIP
743
      for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
744
      {
ed30280f   Steven   last changes.
745
          if( testForOverlap( s.first, topic ) )
51becbde   Peter M. Groen   Committed the ent...
746
747
748
749
750
751
752
753
          {
              existingTopic = s.first;
              return true;
          }
      }
      return false;
  }
  
ed30280f   Steven   last changes.
754
  void ClientPaho::pushIncomingEvent( std::function<void()> ev )
51becbde   Peter M. Groen   Committed the ent...
755
  {
ed30280f   Steven   last changes.
756
      m_callbackEventQueue.push( ev );
51becbde   Peter M. Groen   Committed the ent...
757
758
759
760
  }
  
  void ClientPaho::callbackEventHandler()
  {
ca0cf29e   Steven   syntax fixes, con...
761
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
0c424e03   Steven   syntax fixes. WIP
762
763
      for( ;; )
      {
51becbde   Peter M. Groen   Committed the ent...
764
          std::vector<std::function<void()>> events;
0c424e03   Steven   syntax fixes. WIP
765
          if( !m_callbackEventQueue.pop(events) )
51becbde   Peter M. Groen   Committed the ent...
766
767
768
769
          {
              break;
          }
  
0c424e03   Steven   syntax fixes. WIP
770
          for( const auto& ev : events )
51becbde   Peter M. Groen   Committed the ent...
771
772
          {
              ev();
51becbde   Peter M. Groen   Committed the ent...
773
774
          }
      }
ed30280f   Steven   last changes.
775
      LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
51becbde   Peter M. Groen   Committed the ent...
776
  }
0c424e03   Steven   syntax fixes. WIP
777
  void ClientPaho::onConnectOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
778
  {
ca0cf29e   Steven   syntax fixes, con...
779
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
780
781
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ed30280f   Steven   last changes.
782
          std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
51becbde   Peter M. Groen   Committed the ent...
783
784
785
786
          m_subscriptions.clear();
          m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
      }
  
ed30280f   Steven   last changes.
787
      setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
788
789
  }
  
2c0c99a5   Peter M. Groen   Fix connect Callb...
790
  void ClientPaho::onConnectSuccessOnInstance()
51becbde   Peter M. Groen   Committed the ent...
791
  {
9802eeb9   Steven   statement changes
792
793
      m_processPendingPublishes = true;
  
d557d523   Peter M. Groen   Fix deferred subs...
794
795
      LogDebug( "[ClientPaho::onConnectSuccessOnInstance]",
                std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) );
51becbde   Peter M. Groen   Committed the ent...
796
797
798
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // Register the connect callback that is used in reconnect scenarios.
0c424e03   Steven   syntax fixes. WIP
799
800
          auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
          if( MQTTASYNC_SUCCESS != rc )
51becbde   Peter M. Groen   Committed the ent...
801
          {
0c424e03   Steven   syntax fixes. WIP
802
              LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
51becbde   Peter M. Groen   Committed the ent...
803
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
804
  
51becbde   Peter M. Groen   Committed the ent...
805
806
807
808
809
810
          // For MQTTV5
          //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
          //if (MQTTASYNC_SUCCESS != rc) {
          //    // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
          //}
          // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
2c0c99a5   Peter M. Groen   Fix connect Callb...
811
  
51becbde   Peter M. Groen   Committed the ent...
812
813
814
          m_operationResult[-100] = true;
          m_pendingOperations.erase(-100);
      }
9421324b   Peter M. Groen   First fix on conn...
815
  
0c424e03   Steven   syntax fixes. WIP
816
817
      setConnectionStatus( ConnectionStatus::Connected );
      if( m_connectPromise )
51becbde   Peter M. Groen   Committed the ent...
818
      {
ca0cf29e   Steven   syntax fixes, con...
819
          LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
51becbde   Peter M. Groen   Committed the ent...
820
821
822
823
824
          m_connectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
825
  void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
826
  {
0c424e03   Steven   syntax fixes. WIP
827
828
      (void) response;
      LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
51becbde   Peter M. Groen   Committed the ent...
829
830
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
51becbde   Peter M. Groen   Committed the ent...
831
832
833
834
          // ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
          m_operationResult[-100] = false;
          m_pendingOperations.erase(-100);
      }
ca0cf29e   Steven   syntax fixes, con...
835
      if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
51becbde   Peter M. Groen   Committed the ent...
836
      {
ca0cf29e   Steven   syntax fixes, con...
837
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
838
839
840
841
842
843
844
845
846
      }
      m_operationsCompleteCV.notify_all();
  }
  
  //void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
  //{
  //    MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
  //}
  
ca0cf29e   Steven   syntax fixes, con...
847
  void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
51becbde   Peter M. Groen   Committed the ent...
848
  {
ca0cf29e   Steven   syntax fixes, con...
849
      LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
51becbde   Peter M. Groen   Committed the ent...
850
851
852
853
854
855
856
857
858
859
860
861
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          m_subscriptions.clear();
          m_pendingSubscriptions.clear();
          m_subscribeTokenToTopic.clear();
          m_unsubscribeTokenToTopic.clear();
  
          // ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
          m_operationResult[-200] = true;
          m_pendingOperations.clear();
      }
  
0c424e03   Steven   syntax fixes. WIP
862
      setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
863
  
0c424e03   Steven   syntax fixes. WIP
864
865
      if( m_disconnectPromise )
      {
51becbde   Peter M. Groen   Committed the ent...
866
867
868
869
870
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
871
  void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
872
  {
0c424e03   Steven   syntax fixes. WIP
873
      (void) response;
ca0cf29e   Steven   syntax fixes, con...
874
      LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
51becbde   Peter M. Groen   Committed the ent...
875
      {
ca0cf29e   Steven   syntax fixes, con...
876
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);        
51becbde   Peter M. Groen   Committed the ent...
877
878
879
880
881
          // ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
          m_operationResult[-200] = false;
          m_pendingOperations.erase(-200);
      }
  
0c424e03   Steven   syntax fixes. WIP
882
      if( MQTTAsync_isConnected( m_client ) )
51becbde   Peter M. Groen   Committed the ent...
883
      {
0c424e03   Steven   syntax fixes. WIP
884
          setConnectionStatus( ConnectionStatus::Connected );
51becbde   Peter M. Groen   Committed the ent...
885
886
887
      }
      else
      {
0c424e03   Steven   syntax fixes. WIP
888
          setConnectionStatus( ConnectionStatus::Disconnected );
51becbde   Peter M. Groen   Committed the ent...
889
890
      }
  
0c424e03   Steven   syntax fixes. WIP
891
      if( m_disconnectPromise )
51becbde   Peter M. Groen   Committed the ent...
892
893
894
895
896
897
      {
          m_disconnectPromise->set_value();
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
898
  void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
899
900
  {
      auto pd = response.publishData();
d557d523   Peter M. Groen   Fix deferred subs...
901
      // LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
902
903
904
905
906
907
908
909
910
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = true;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
911
  void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
912
  {
ca0cf29e   Steven   syntax fixes, con...
913
      LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
51becbde   Peter M. Groen   Committed the ent...
914
915
916
917
918
919
920
921
922
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
          // ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
          m_pendingOperations.erase(response.token());
      }
      m_operationsCompleteCV.notify_all();
  }
  
0c424e03   Steven   syntax fixes. WIP
923
  void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
924
  {
ca0cf29e   Steven   syntax fixes, con...
925
926
927
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
51becbde   Peter M. Groen   Committed the ent...
928
929
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
930
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
931
932
933
      {
          // ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
934
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
935
      });
ca0cf29e   Steven   syntax fixes, con...
936
937
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
0c424e03   Steven   syntax fixes. WIP
938
      {
ca0cf29e   Steven   syntax fixes, con...
939
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
940
941
942
943
944
          return;
      }
      auto topic = it->second;
      m_subscribeTokenToTopic.erase(it);
  
ca0cf29e   Steven   syntax fixes, con...
945
946
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
947
      {
ca0cf29e   Steven   syntax fixes, con...
948
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
949
950
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
951
      if( response.qos() != pendingIt->second.qos )
51becbde   Peter M. Groen   Committed the ent...
952
      {
ca0cf29e   Steven   syntax fixes, con...
953
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
51becbde   Peter M. Groen   Committed the ent...
954
      }
ca0cf29e   Steven   syntax fixes, con...
955
956
957
958
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
      m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
959
960
961
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
962
  void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
963
  {
ca0cf29e   Steven   syntax fixes, con...
964
965
966
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
  
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
967
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
968
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
969
970
971
      {
          // MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
972
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
973
974
      });
  
ca0cf29e   Steven   syntax fixes, con...
975
976
      auto it = m_subscribeTokenToTopic.find( response.token() );
      if( m_subscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
977
      {
ca0cf29e   Steven   syntax fixes, con...
978
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
979
980
981
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
982
      m_subscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
983
  
ca0cf29e   Steven   syntax fixes, con...
984
985
      auto pendingIt = m_pendingSubscriptions.find( topic );
      if( m_pendingSubscriptions.end() == pendingIt )
51becbde   Peter M. Groen   Committed the ent...
986
      {
ca0cf29e   Steven   syntax fixes, con...
987
          LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
988
989
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
990
991
      LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
      m_pendingSubscriptions.erase( pendingIt );
51becbde   Peter M. Groen   Committed the ent...
992
993
  }
  
ca0cf29e   Steven   syntax fixes, con...
994
  void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
51becbde   Peter M. Groen   Committed the ent...
995
  {
ca0cf29e   Steven   syntax fixes, con...
996
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
51becbde   Peter M. Groen   Committed the ent...
997
  
ca0cf29e   Steven   syntax fixes, con...
998
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
999
1000
1001
1002
1003
1004
1005
1006
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
  
      // On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
      // As a workaround the last unsubscribe token is stored and is used when no valid token is available.
      // This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
      // before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
      // sequentially (see ClientPaho::unsubscribe)!
      auto token = response.token();
ca0cf29e   Steven   syntax fixes, con...
1007
      if( -1 == token )
51becbde   Peter M. Groen   Committed the ent...
1008
1009
1010
1011
1012
1013
      {
          token = m_lastUnsubscribe;
          m_lastUnsubscribe = -1;
      }
  
      bool operationOk = false;
ca0cf29e   Steven   syntax fixes, con...
1014
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
51becbde   Peter M. Groen   Committed the ent...
1015
1016
1017
      {
          // ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
          m_operationResult[token] = operationOk;
ca0cf29e   Steven   syntax fixes, con...
1018
          m_pendingOperations.erase( token );
51becbde   Peter M. Groen   Committed the ent...
1019
1020
      });
  
ca0cf29e   Steven   syntax fixes, con...
1021
1022
      auto it = m_unsubscribeTokenToTopic.find( token );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1023
      {
ca0cf29e   Steven   syntax fixes, con...
1024
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
51becbde   Peter M. Groen   Committed the ent...
1025
1026
1027
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1028
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1029
  
ca0cf29e   Steven   syntax fixes, con...
1030
1031
1032
1033
      auto registeredIt = m_subscriptions.find( topic );
      if( m_subscriptions.end() == registeredIt )
      {
          LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1034
1035
          return;
      }
ca0cf29e   Steven   syntax fixes, con...
1036
1037
1038
  
      LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
      m_subscriptions.erase( registeredIt );
51becbde   Peter M. Groen   Committed the ent...
1039
1040
1041
      operationOk = true;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1042
  void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
51becbde   Peter M. Groen   Committed the ent...
1043
  {
ca0cf29e   Steven   syntax fixes, con...
1044
1045
      LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
      OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
51becbde   Peter M. Groen   Committed the ent...
1046
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1047
      OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
51becbde   Peter M. Groen   Committed the ent...
1048
1049
1050
      {
          // ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
          m_operationResult[response.token()] = false;
ca0cf29e   Steven   syntax fixes, con...
1051
          m_pendingOperations.erase( response.token() );
51becbde   Peter M. Groen   Committed the ent...
1052
1053
      });
  
ca0cf29e   Steven   syntax fixes, con...
1054
1055
      auto it = m_unsubscribeTokenToTopic.find( response.token() );
      if( m_unsubscribeTokenToTopic.end() == it )
51becbde   Peter M. Groen   Committed the ent...
1056
      {
ca0cf29e   Steven   syntax fixes, con...
1057
          LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1058
1059
1060
          return;
      }
      auto topic = it->second;
ca0cf29e   Steven   syntax fixes, con...
1061
      m_unsubscribeTokenToTopic.erase( it );
51becbde   Peter M. Groen   Committed the ent...
1062
1063
  }
  
ca0cf29e   Steven   syntax fixes, con...
1064
  int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
51becbde   Peter M. Groen   Committed the ent...
1065
  {
ca0cf29e   Steven   syntax fixes, con...
1066
      LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
51becbde   Peter M. Groen   Committed the ent...
1067
  
ca0cf29e   Steven   syntax fixes, con...
1068
      std::function<void( MqttMessage )> cb;
51becbde   Peter M. Groen   Committed the ent...
1069
1070
      {
          OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
ca0cf29e   Steven   syntax fixes, con...
1071
          for( const auto& s : m_subscriptions )
51becbde   Peter M. Groen   Committed the ent...
1072
          {
ca0cf29e   Steven   syntax fixes, con...
1073
              if( boost::regex_match( message.topic(), s.second.topicRegex ) )
51becbde   Peter M. Groen   Committed the ent...
1074
1075
1076
1077
1078
1079
              {
                  cb = s.second.callback;
              }
          }
      }
  
ca0cf29e   Steven   syntax fixes, con...
1080
      if( cb )
51becbde   Peter M. Groen   Committed the ent...
1081
      {
ca0cf29e   Steven   syntax fixes, con...
1082
          cb( message );
51becbde   Peter M. Groen   Committed the ent...
1083
1084
1085
      }
      else
      {
ca0cf29e   Steven   syntax fixes, con...
1086
          LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
51becbde   Peter M. Groen   Committed the ent...
1087
1088
1089
1090
      }
      return 1;
  }
  
ca0cf29e   Steven   syntax fixes, con...
1091
  void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1092
  {
ca0cf29e   Steven   syntax fixes, con...
1093
1094
      LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
      if( m_deliveryCompleteCallback )
51becbde   Peter M. Groen   Committed the ent...
1095
      {
ca0cf29e   Steven   syntax fixes, con...
1096
          m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
51becbde   Peter M. Groen   Committed the ent...
1097
1098
1099
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1100
  void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
51becbde   Peter M. Groen   Committed the ent...
1101
  {
ca0cf29e   Steven   syntax fixes, con...
1102
      (void) cause;
51becbde   Peter M. Groen   Committed the ent...
1103
      // ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
ca0cf29e   Steven   syntax fixes, con...
1104
      setConnectionStatus( ConnectionStatus::ReconnectInProgress );
51becbde   Peter M. Groen   Committed the ent...
1105
1106
1107
  
      OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
      // Remove all tokens related to subscriptions from the active operations.
ca0cf29e   Steven   syntax fixes, con...
1108
      for( const auto& p : m_subscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1109
1110
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1111
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1112
1113
      }
  
ca0cf29e   Steven   syntax fixes, con...
1114
      for( const auto& p : m_unsubscribeTokenToTopic )
51becbde   Peter M. Groen   Committed the ent...
1115
1116
      {
          // ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
ca0cf29e   Steven   syntax fixes, con...
1117
          m_pendingOperations.erase( p.first );
51becbde   Peter M. Groen   Committed the ent...
1118
1119
1120
1121
1122
1123
1124
      }
      // Clear the administration used in the subscribe process.
      m_subscribeTokenToTopic.clear();
      m_unsubscribeTokenToTopic.clear();
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1125
  void ClientPaho::onFirstConnect( void* context, char* cause )
9421324b   Peter M. Groen   First fix on conn...
1126
1127
  {
      LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1128
      if( context )
9421324b   Peter M. Groen   First fix on conn...
1129
      {
ca0cf29e   Steven   syntax fixes, con...
1130
1131
          auto *cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "Unknown cause" : cause );
9802eeb9   Steven   statement changes
1132
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
9421324b   Peter M. Groen   First fix on conn...
1133
1134
1135
      }
  }
  
ca0cf29e   Steven   syntax fixes, con...
1136
  void ClientPaho::onConnect( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1137
  {
9421324b   Peter M. Groen   First fix on conn...
1138
      LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1139
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1140
      {
ca0cf29e   Steven   syntax fixes, con...
1141
1142
1143
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string reason( nullptr == cause ? "unknown cause" : cause );
          cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
51becbde   Peter M. Groen   Committed the ent...
1144
1145
1146
1147
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1148
  void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1149
  {
2c0c99a5   Peter M. Groen   Fix connect Callb...
1150
      LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1151
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1152
      {
ca0cf29e   Steven   syntax fixes, con...
1153
1154
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
76d01373   Peter M. Groen   Fix on connection
1155
          {
51becbde   Peter M. Groen   Committed the ent...
1156
              // connect should always have a valid response struct.
ca0cf29e   Steven   syntax fixes, con...
1157
              LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
2c0c99a5   Peter M. Groen   Fix connect Callb...
1158
              return;
51becbde   Peter M. Groen   Committed the ent...
1159
          }
2c0c99a5   Peter M. Groen   Fix connect Callb...
1160
          // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
ca0cf29e   Steven   syntax fixes, con...
1161
          cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
51becbde   Peter M. Groen   Committed the ent...
1162
1163
1164
1165
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1166
  void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1167
  {
ca0cf29e   Steven   syntax fixes, con...
1168
1169
      LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1170
      {
ca0cf29e   Steven   syntax fixes, con...
1171
1172
1173
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
      }
  }
  
  //// static
  //void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
  //{
  //    apply_unused_parameters(properties);
  //    try {
  //        if (context) {
  //            auto* cl = reinterpret_cast<ClientPaho*>(context);
  //            cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
  //        }
  //    }
  //    catch (...) {
  //    }
  //    catch (const std::exception& e) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
  //    }
  //    catch (...) {
  //        MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
  //    }
  //}
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1198
  void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1199
  {
ca0cf29e   Steven   syntax fixes, con...
1200
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1201
      {
ca0cf29e   Steven   syntax fixes, con...
1202
1203
1204
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : 0 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1205
1206
1207
1208
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1209
  void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1210
  {
9421324b   Peter M. Groen   First fix on conn...
1211
      LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
ca0cf29e   Steven   syntax fixes, con...
1212
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1213
      {
ca0cf29e   Steven   syntax fixes, con...
1214
1215
1216
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1217
1218
1219
1220
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1221
  void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1222
  {
ca0cf29e   Steven   syntax fixes, con...
1223
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1224
      {
ca0cf29e   Steven   syntax fixes, con...
1225
1226
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
          {
              // publish should always have a valid response struct.
              // toLogFile ("ClientPaho", "onPublishSuccess - no response data");
          }
          MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
          cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1237
  void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1238
  {
ca0cf29e   Steven   syntax fixes, con...
1239
1240
      (void) response;
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1241
      {
ca0cf29e   Steven   syntax fixes, con...
1242
1243
1244
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1245
1246
1247
1248
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1249
  void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1250
  {
ca0cf29e   Steven   syntax fixes, con...
1251
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1252
      {
ca0cf29e   Steven   syntax fixes, con...
1253
1254
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          if( !response )
51becbde   Peter M. Groen   Committed the ent...
1255
1256
1257
1258
          {
              // subscribe should always have a valid response struct.
              // MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
          }
ca0cf29e   Steven   syntax fixes, con...
1259
1260
          MqttSuccess resp( response->token, response->alt.qos );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1261
1262
1263
1264
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1265
  void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1266
  {
ca0cf29e   Steven   syntax fixes, con...
1267
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1268
      {
ca0cf29e   Steven   syntax fixes, con...
1269
1270
1271
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1272
1273
1274
1275
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1276
  void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
51becbde   Peter M. Groen   Committed the ent...
1277
  {
ca0cf29e   Steven   syntax fixes, con...
1278
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1279
      {
ca0cf29e   Steven   syntax fixes, con...
1280
1281
1282
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttSuccess resp( response ? response->token : -1 );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1283
1284
1285
1286
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1287
  void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
51becbde   Peter M. Groen   Committed the ent...
1288
  {
ca0cf29e   Steven   syntax fixes, con...
1289
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1290
      {
ca0cf29e   Steven   syntax fixes, con...
1291
1292
1293
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttFailure resp( response );
          cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
51becbde   Peter M. Groen   Committed the ent...
1294
1295
1296
1297
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1298
  int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
51becbde   Peter M. Groen   Committed the ent...
1299
1300
  {
  
ca0cf29e   Steven   syntax fixes, con...
1301
      OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
51becbde   Peter M. Groen   Committed the ent...
1302
      {
ca0cf29e   Steven   syntax fixes, con...
1303
1304
          MQTTAsync_freeMessage( &message );
          MQTTAsync_free( topicName );
51becbde   Peter M. Groen   Committed the ent...
1305
1306
      });
  
ca0cf29e   Steven   syntax fixes, con...
1307
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1308
      {
ca0cf29e   Steven   syntax fixes, con...
1309
1310
1311
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          MqttMessage msg( topicName, *message );
          cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1312
1313
1314
1315
1316
1317
      }
  
      return 1; // always return true. Otherwise this callback is triggered again.
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1318
  void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
51becbde   Peter M. Groen   Committed the ent...
1319
  {
ca0cf29e   Steven   syntax fixes, con...
1320
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1321
      {
ca0cf29e   Steven   syntax fixes, con...
1322
1323
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
51becbde   Peter M. Groen   Committed the ent...
1324
1325
1326
1327
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1328
  void ClientPaho::onConnectionLost( void* context, char* cause )
51becbde   Peter M. Groen   Committed the ent...
1329
  {
ca0cf29e   Steven   syntax fixes, con...
1330
      OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
51becbde   Peter M. Groen   Committed the ent...
1331
      {
ca0cf29e   Steven   syntax fixes, con...
1332
          if( cause )
51becbde   Peter M. Groen   Committed the ent...
1333
          {
ca0cf29e   Steven   syntax fixes, con...
1334
              MQTTAsync_free( cause );
51becbde   Peter M. Groen   Committed the ent...
1335
1336
1337
          }
      });
  
ca0cf29e   Steven   syntax fixes, con...
1338
      if( context )
51becbde   Peter M. Groen   Committed the ent...
1339
      {
ca0cf29e   Steven   syntax fixes, con...
1340
1341
1342
          auto* cl = reinterpret_cast<ClientPaho*>( context );
          std::string msg( nullptr == cause ? "cause unknown" : cause );
          cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
51becbde   Peter M. Groen   Committed the ent...
1343
1344
1345
1346
      }
  }
  
  // static
ca0cf29e   Steven   syntax fixes, con...
1347
  void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
51becbde   Peter M. Groen   Committed the ent...
1348
  {
ca0cf29e   Steven   syntax fixes, con...
1349
1350
      (void) message;
      switch( level )
51becbde   Peter M. Groen   Committed the ent...
1351
1352
1353
      {
          case MQTTASYNC_TRACE_MAXIMUM:
          case MQTTASYNC_TRACE_MEDIUM:
ca0cf29e   Steven   syntax fixes, con...
1354
1355
          case MQTTASYNC_TRACE_MINIMUM:
          {
51becbde   Peter M. Groen   Committed the ent...
1356
1357
1358
              // ("ClientPaho", "paho - %1", message)
              break;
          }
ca0cf29e   Steven   syntax fixes, con...
1359
1360
          case MQTTASYNC_TRACE_PROTOCOL:
          {
51becbde   Peter M. Groen   Committed the ent...
1361
1362
1363
1364
1365
              // ("ClientPaho", "paho - %1", message)
              break;
          }
          case MQTTASYNC_TRACE_ERROR:
          case MQTTASYNC_TRACE_SEVERE:
ca0cf29e   Steven   syntax fixes, con...
1366
1367
          case MQTTASYNC_TRACE_FATAL:
          {
51becbde   Peter M. Groen   Committed the ent...
1368
1369
1370
1371
1372
              // ("ClientPaho", "paho - %1", message)
              break;
          }
      }
  }