b5d9e433
Peter M. Groen
Fixed License Hea...
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/* ****************************************************************************
* Copyright 2019 Open Systems Development BV *
* *
* Permission is hereby granted, free of charge, to any person obtaining a *
* copy of this software and associated documentation files (the "Software"), *
* to deal in the Software without restriction, including without limitation *
* the rights to use, copy, modify, merge, publish, distribute, sublicense, *
* and/or sell copies of the Software, and to permit persons to whom the *
* Software is furnished to do so, subject to the following conditions: *
* *
* The above copyright notice and this permission notice shall be included in *
* all copies or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *
* DEALINGS IN THE SOFTWARE. *
* ***************************************************************************/
|
51becbde
Peter M. Groen
Committed the ent...
|
22
23
24
25
26
|
#include "clientpaho.h"
#include "errorcode.h"
#include "mqttutil.h"
#include "lockguard.h"
|
9421324b
Peter M. Groen
First fix on conn...
|
27
|
#include "log.h"
|
51becbde
Peter M. Groen
Committed the ent...
|
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
#include "metaprogrammingdefs.h"
#include "mqttstream.h"
#include "scopeguard.h"
#include "uriparser.h"
// std::chrono
#include "compat-chrono.h"
// std
#include <algorithm>
#include <iterator>
using namespace osdev::components::mqtt;
namespace {
#if defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-template"
#endif
OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)
template <typename TRet>
inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
{
return MQTTAsync_disconnectOptions_initializer;
}
template <typename TRet>
inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
{
// For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
// This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
// method should not be chosen when the struct does not contain member onSuccess5!
// On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
// while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
// We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
// additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
#ifndef MQTTAsync_disconnectOptions_initializer5
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmissing-field-initializers"
TRet ret = MQTTAsync_disconnectOptions_initializer;
ret.struct_version = 1;
ret.onSuccess5 = nullptr;
ret.onFailure5 = nullptr;
return ret;
#pragma GCC diagnostic pop
#else
return MQTTAsync_disconnectOptions_initializer5;
#endif
}
template <typename TRet>
struct Init
{
static TRet initialize()
{
return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
}
};
#if defined(__clang__)
#pragma GCC diagnostic pop
#endif
} // namespace
std::atomic_int ClientPaho::s_numberOfInstances(0);
|
ca0cf29e
Steven
syntax fixes, con...
|
97
|
ClientPaho::ClientPaho( const std::string& _endpoint,
|
51becbde
Peter M. Groen
Committed the ent...
|
98
|
const std::string& _id,
|
ca0cf29e
Steven
syntax fixes, con...
|
99
100
|
const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
|
51becbde
Peter M. Groen
Committed the ent...
|
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
: m_mutex()
, m_endpoint()
, m_username()
, m_password()
, m_clientId(_id)
, m_pendingOperations()
, m_operationResult()
, m_operationsCompleteCV()
, m_subscriptions()
, m_pendingSubscriptions()
, m_subscribeTokenToTopic()
, m_unsubscribeTokenToTopic()
, m_pendingPublishes()
, m_processPendingPublishes(false)
, m_pendingPublishesReadyCV()
, m_client()
, m_connectionStatus(ConnectionStatus::Disconnected)
, m_connectionStatusCallback(connectionStatusCallback)
, m_deliveryCompleteCallback(deliveryCompleteCallback)
, m_lastUnsubscribe(-1)
, m_connectPromise()
, m_disconnectPromise()
, m_callbackEventQueue(m_clientId)
, m_workerThread()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
126
|
if( 0 == s_numberOfInstances++ )
|
76d01373
Peter M. Groen
Fix on connection
|
127
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
128
|
MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
|
51becbde
Peter M. Groen
Committed the ent...
|
129
|
}
|
76d01373
Peter M. Groen
Fix on connection
|
130
|
|
ca0cf29e
Steven
syntax fixes, con...
|
131
|
LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
132
|
parseEndpoint(_endpoint);
|
76d01373
Peter M. Groen
Fix on connection
|
133
|
|
ca0cf29e
Steven
syntax fixes, con...
|
134
135
|
auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
if( MQTTASYNC_SUCCESS == rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
136
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
137
138
|
MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
|
51becbde
Peter M. Groen
Committed the ent...
|
139
140
141
|
}
else
{
|
76d01373
Peter M. Groen
Fix on connection
|
142
|
LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
143
144
145
146
147
|
}
}
ClientPaho::~ClientPaho()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
148
|
LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
149
150
151
152
|
if( MQTTAsync_isConnected( m_client ) )
{
this->unsubscribeAll();
|
ca0cf29e
Steven
syntax fixes, con...
|
153
154
|
this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
this->disconnect( true, 5000 );
|
51becbde
Peter M. Groen
Committed the ent...
|
155
156
157
158
|
}
else
{
// If the status was already disconnected this call does nothing
|
ca0cf29e
Steven
syntax fixes, con...
|
159
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
160
161
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
162
|
if( 0 == --s_numberOfInstances )
|
51becbde
Peter M. Groen
Committed the ent...
|
163
164
165
166
167
|
{
// encountered a case where termination of the logging system within paho led to a segfault.
// This was a paho thread that was cleaned while at the same time the logging system was terminated.
// Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
// frequently.
|
ca0cf29e
Steven
syntax fixes, con...
|
168
|
MQTTAsync_setTraceCallback( nullptr );
|
51becbde
Peter M. Groen
Committed the ent...
|
169
170
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
171
|
MQTTAsync_destroy( &m_client );
|
51becbde
Peter M. Groen
Committed the ent...
|
172
173
|
m_callbackEventQueue.stop();
|
ca0cf29e
Steven
syntax fixes, con...
|
174
|
if( m_workerThread.joinable() )
|
51becbde
Peter M. Groen
Committed the ent...
|
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
{
m_workerThread.join();
}
}
std::string ClientPaho::clientId() const
{
return m_clientId;
}
ConnectionStatus ClientPaho::connectionStatus() const
{
return m_connectionStatus;
}
|
31eece9b
Steven
added LWT ( last ...
|
190
|
std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
|
51becbde
Peter M. Groen
Committed the ent...
|
191
192
193
|
{
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
2b967ea7
Steven
promise is not wo...
|
194
|
if( ConnectionStatus::Disconnected != m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
195
196
197
|
{
return -1;
}
|
2b967ea7
Steven
promise is not wo...
|
198
|
setConnectionStatus( ConnectionStatus::ConnectInProgress );
|
51becbde
Peter M. Groen
Committed the ent...
|
199
200
|
}
|
76d01373
Peter M. Groen
Fix on connection
|
201
202
|
LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
203
|
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
|
9421324b
Peter M. Groen
First fix on conn...
|
204
|
conn_opts.keepAliveInterval = 5;
|
51becbde
Peter M. Groen
Committed the ent...
|
205
|
conn_opts.cleansession = 1;
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
206
|
conn_opts.onSuccess = nullptr;
|
51becbde
Peter M. Groen
Committed the ent...
|
207
208
209
|
conn_opts.onFailure = &ClientPaho::onConnectFailure;
conn_opts.context = this;
conn_opts.automaticReconnect = 1;
|
31eece9b
Steven
added LWT ( last ...
|
210
|
|
76d01373
Peter M. Groen
Fix on connection
|
211
212
213
214
|
// Make sure we get a signal if the promise is fulfilled
auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
if( MQTTASYNC_SUCCESS == ccb )
{
|
ca0cf29e
Steven
syntax fixes, con...
|
215
|
LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
|
76d01373
Peter M. Groen
Fix on connection
|
216
217
218
|
}
else
{
|
ca0cf29e
Steven
syntax fixes, con...
|
219
|
LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
|
76d01373
Peter M. Groen
Fix on connection
|
220
221
222
|
}
// Setup the last will and testament, if so desired.
|
31eece9b
Steven
added LWT ( last ...
|
223
224
225
226
227
228
229
|
if( !lwt.topic().empty() )
{
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
will_opts.message = lwt.message().c_str();
will_opts.topicName = lwt.topic().c_str();
conn_opts.will = &will_opts;
|
76d01373
Peter M. Groen
Fix on connection
|
230
231
|
LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
|
31eece9b
Steven
added LWT ( last ...
|
232
233
234
235
236
237
|
}
else
{
conn_opts.will = nullptr;
}
|
2b967ea7
Steven
promise is not wo...
|
238
|
if( !m_username.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
239
240
241
242
|
{
conn_opts.username = m_username.c_str();
}
|
2b967ea7
Steven
promise is not wo...
|
243
|
if( !m_password.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
244
245
246
247
248
249
250
|
{
conn_opts.password = m_password.c_str();
}
std::promise<void> waitForConnectPromise{};
auto waitForConnect = waitForConnectPromise.get_future();
m_connectPromise.reset();
|
2b967ea7
Steven
promise is not wo...
|
251
|
if( wait )
|
51becbde
Peter M. Groen
Committed the ent...
|
252
|
{
|
2b967ea7
Steven
promise is not wo...
|
253
|
m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
254
255
256
|
}
{
|
2b967ea7
Steven
promise is not wo...
|
257
258
|
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
if( !m_pendingOperations.insert( -100 ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
259
260
261
|
{
// Write something
}
|
2b967ea7
Steven
promise is not wo...
|
262
|
m_operationResult.erase( -100 );
|
51becbde
Peter M. Groen
Committed the ent...
|
263
264
|
}
|
2b967ea7
Steven
promise is not wo...
|
265
266
|
int rc = MQTTAsync_connect( m_client, &conn_opts );
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
267
|
{
|
2b967ea7
Steven
promise is not wo...
|
268
269
|
setConnectionStatus( ConnectionStatus::Disconnected );
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
|
51becbde
Peter M. Groen
Committed the ent...
|
270
271
272
273
|
m_operationResult[-100] = false;
m_pendingOperations.erase(-100);
}
|
2b967ea7
Steven
promise is not wo...
|
274
|
if( wait )
|
51becbde
Peter M. Groen
Committed the ent...
|
275
276
277
278
279
280
281
|
{
waitForConnect.get();
m_connectPromise.reset();
}
return -100;
}
|
0c424e03
Steven
syntax fixes. WIP
|
282
|
std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
|
51becbde
Peter M. Groen
Committed the ent...
|
283
284
285
286
287
|
{
ConnectionStatus currentStatus = m_connectionStatus;
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
0c424e03
Steven
syntax fixes. WIP
|
288
289
|
if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
290
291
292
293
|
return -1;
}
currentStatus = m_connectionStatus;
|
0c424e03
Steven
syntax fixes. WIP
|
294
|
setConnectionStatus( ConnectionStatus::DisconnectInProgress );
|
51becbde
Peter M. Groen
Committed the ent...
|
295
296
297
298
299
300
301
302
303
304
305
|
}
MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
disconn_opts.timeout = timeoutMs;
disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
disconn_opts.context = this;
std::promise<void> waitForDisconnectPromise{};
auto waitForDisconnect = waitForDisconnectPromise.get_future();
m_disconnectPromise.reset();
|
0c424e03
Steven
syntax fixes. WIP
|
306
307
|
if( wait )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
308
309
310
311
312
|
m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
}
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
313
|
if( !m_pendingOperations.insert( -200 ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
314
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
315
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
316
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
317
|
m_operationResult.erase( -200 );
|
51becbde
Peter M. Groen
Committed the ent...
|
318
319
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
320
|
int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
|
0c424e03
Steven
syntax fixes. WIP
|
321
|
if( MQTTASYNC_SUCCESS != rc )
|
76d01373
Peter M. Groen
Fix on connection
|
322
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
323
|
if( MQTTASYNC_DISCONNECTED == rc )
|
76d01373
Peter M. Groen
Fix on connection
|
324
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
325
326
327
|
currentStatus = ConnectionStatus::Disconnected;
}
|
0c424e03
Steven
syntax fixes. WIP
|
328
329
|
setConnectionStatus( currentStatus );
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
|
51becbde
Peter M. Groen
Committed the ent...
|
330
|
m_operationResult[-200] = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
331
|
m_pendingOperations.erase( -200 );
|
51becbde
Peter M. Groen
Committed the ent...
|
332
|
|
0c424e03
Steven
syntax fixes. WIP
|
333
|
if( MQTTASYNC_DISCONNECTED == rc )
|
2b967ea7
Steven
promise is not wo...
|
334
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
335
336
|
return -1;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
337
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
338
339
|
}
|
2b967ea7
Steven
promise is not wo...
|
340
341
|
if( wait )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
342
343
|
if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
{
|
ca0cf29e
Steven
syntax fixes, con...
|
344
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
345
346
347
348
349
350
351
|
}
waitForDisconnect.get();
m_disconnectPromise.reset();
}
return -200;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
352
|
std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
353
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
354
|
if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
355
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
356
|
LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
357
358
|
return -1;
}
|
0c424e03
Steven
syntax fixes. WIP
|
359
|
else if( ConnectionStatus::Disconnected == m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
360
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
361
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
|
2b967ea7
Steven
promise is not wo...
|
362
|
connect( true );
|
51becbde
Peter M. Groen
Committed the ent...
|
363
364
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
365
|
if( !isValidTopic(message.topic() ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
366
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
367
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
368
369
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
370
|
if( qos > 2 )
|
51becbde
Peter M. Groen
Committed the ent...
|
371
372
373
|
{
qos = 2;
}
|
0c424e03
Steven
syntax fixes. WIP
|
374
|
else if( qos < 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
375
376
377
378
|
{
qos = 0;
}
|
51becbde
Peter M. Groen
Committed the ent...
|
379
|
std::unique_lock<std::mutex> lck(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
380
|
if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes )
|
a670240b
Peter M. Groen
Fix on connection
|
381
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
382
|
m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
|
ca0cf29e
Steven
syntax fixes, con...
|
383
|
if( ConnectionStatus::ReconnectInProgress == m_connectionStatus )
|
a670240b
Peter M. Groen
Fix on connection
|
384
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
385
|
LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
|
ca0cf29e
Steven
syntax fixes, con...
|
386
|
m_pendingPublishes.push_front( Publish{ qos, message } );
|
51becbde
Peter M. Groen
Committed the ent...
|
387
388
389
390
|
return -1;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
391
|
return publishInternal( message, qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
392
393
394
395
396
397
|
}
void ClientPaho::publishPending()
{
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
0c424e03
Steven
syntax fixes. WIP
|
398
|
if( !m_processPendingPublishes )
|
2b967ea7
Steven
promise is not wo...
|
399
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
400
401
402
403
|
return;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
404
|
if( ConnectionStatus::Connected != m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
405
|
{
|
a670240b
Peter M. Groen
Fix on connection
|
406
|
LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
407
408
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
409
|
while( !m_pendingPublishes.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
410
411
|
{
const auto& pub = m_pendingPublishes.back();
|
ca0cf29e
Steven
syntax fixes, con...
|
412
|
publishInternal( pub.data, pub.qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
413
414
415
416
417
418
419
420
421
422
423
|
m_pendingPublishes.pop_back();
}
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
m_processPendingPublishes = false;
}
m_pendingPublishesReadyCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
424
|
std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
|
51becbde
Peter M. Groen
Committed the ent...
|
425
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
426
|
if( ConnectionStatus::Connected != m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
427
428
429
430
|
{
// MqttException, "Not connected"
}
|
0c424e03
Steven
syntax fixes. WIP
|
431
|
if( !isValidTopic( topic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
432
433
434
435
|
{
// ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
}
|
0c424e03
Steven
syntax fixes. WIP
|
436
|
if( qos > 2 )
|
51becbde
Peter M. Groen
Committed the ent...
|
437
438
439
|
{
qos = 2;
}
|
0c424e03
Steven
syntax fixes. WIP
|
440
|
else if( qos < 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
441
442
443
444
445
446
447
448
|
{
qos = 0;
}
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
auto itExisting = m_subscriptions.find(topic);
|
0c424e03
Steven
syntax fixes. WIP
|
449
450
451
452
|
if( m_subscriptions.end() != itExisting )
{
if( itExisting->second.qos == qos )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
453
454
455
456
457
458
|
return -1;
}
// (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
}
auto itPending = m_pendingSubscriptions.find(topic);
|
0c424e03
Steven
syntax fixes. WIP
|
459
460
461
462
463
464
465
|
if( m_pendingSubscriptions.end() != itPending )
{
if( itPending->second.qos == qos )
{
auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
if( m_subscribeTokenToTopic.end() != itToken )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
466
467
|
return itToken->first;
}
|
0c424e03
Steven
syntax fixes. WIP
|
468
469
|
else
{
|
51becbde
Peter M. Groen
Committed the ent...
|
470
471
472
473
474
475
476
|
return -1;
}
}
// (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
}
std::string existingTopic{};
|
0c424e03
Steven
syntax fixes. WIP
|
477
|
if( isOverlappingInternal( topic, existingTopic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
478
479
480
481
|
{
// (OverlappingTopicException, "overlapping topic", existingTopic, topic);
}
|
ca0cf29e
Steven
syntax fixes, con...
|
482
483
|
LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
484
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
485
|
return subscribeInternal( topic, qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
486
487
488
489
|
}
void ClientPaho::resubscribe()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
490
|
decltype( m_pendingSubscriptions ) pendingSubscriptions{};
|
51becbde
Peter M. Groen
Committed the ent...
|
491
492
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
493
|
std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
494
495
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
496
|
for( const auto& s : pendingSubscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
497
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
498
|
subscribeInternal( s.first, s.second.qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
499
500
501
|
}
}
|
31eece9b
Steven
added LWT ( last ...
|
502
|
std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
503
504
|
{
{
|
0c424e03
Steven
syntax fixes. WIP
|
505
|
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
|
51becbde
Peter M. Groen
Committed the ent...
|
506
|
bool found = false;
|
0c424e03
Steven
syntax fixes. WIP
|
507
|
for( const auto& s : m_subscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
508
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
509
|
if( topic == s.first && qos == s.second.qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
510
511
512
513
514
|
{
found = true;
break;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
515
|
if( !found )
|
51becbde
Peter M. Groen
Committed the ent...
|
516
517
518
519
520
521
522
523
524
525
526
527
528
529
|
{
return -1;
}
}
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
opts.onFailure = &ClientPaho::onUnsubscribeFailure;
opts.context = this;
{
// Need to lock the mutex because it is possible that the callback is faster than
// the insertion of the token into the pending operations.
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
530
|
auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
|
0c424e03
Steven
syntax fixes. WIP
|
531
|
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
532
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
533
|
LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
534
535
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
536
|
if( !m_pendingOperations.insert( opts.token ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
537
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
538
|
LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
539
540
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
541
542
|
m_operationResult.erase( opts.token );
if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
543
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
544
|
LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
545
546
547
548
549
550
551
552
553
554
555
556
557
|
}
m_lastUnsubscribe = opts.token; // centos7 workaround
m_unsubscribeTokenToTopic[opts.token] = topic;
}
// Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });
return opts.token;
}
void ClientPaho::unsubscribeAll()
{
|
0c424e03
Steven
syntax fixes. WIP
|
558
|
decltype( m_subscriptions ) subscriptions{};
|
51becbde
Peter M. Groen
Committed the ent...
|
559
560
561
562
563
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
subscriptions = m_subscriptions;
}
|
0c424e03
Steven
syntax fixes. WIP
|
564
565
|
for( const auto& s : subscriptions )
{
|
ca0cf29e
Steven
syntax fixes, con...
|
566
|
this->unsubscribe( s.first, s.second.qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
567
568
569
570
571
|
}
}
std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
{
|
ca0cf29e
Steven
syntax fixes, con...
|
572
573
574
|
if( waitFor <= std::chrono::milliseconds( 0 ) )
{
return std::chrono::milliseconds( 0 );
|
51becbde
Peter M. Groen
Committed the ent...
|
575
576
577
|
}
std::chrono::milliseconds timeElapsed{};
{
|
ca0cf29e
Steven
syntax fixes, con...
|
578
|
osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
|
51becbde
Peter M. Groen
Committed the ent...
|
579
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
580
|
timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
|
51becbde
Peter M. Groen
Committed the ent...
|
581
582
|
});
std::unique_lock<std::mutex> lck(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
583
|
|
51becbde
Peter M. Groen
Committed the ent...
|
584
585
586
|
// ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
587
|
if( tokens.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
588
589
590
|
{ // wait for all operations to end
return m_pendingOperations.empty();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
591
|
else if( tokens.size() == 1 )
|
51becbde
Peter M. Groen
Committed the ent...
|
592
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
593
|
return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
|
51becbde
Peter M. Groen
Committed the ent...
|
594
595
596
597
598
599
600
601
602
|
}
std::vector<std::int32_t> intersect{};
std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
return intersect.empty();
} );
}
return timeElapsed;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
603
|
bool ClientPaho::isOverlapping( const std::string& topic ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
604
605
|
{
std::string existingTopic{};
|
ca0cf29e
Steven
syntax fixes, con...
|
606
|
return isOverlapping( topic, existingTopic );
|
51becbde
Peter M. Groen
Committed the ent...
|
607
608
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
609
|
bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
610
611
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
612
|
return isOverlappingInternal( topic, existingTopic );
|
51becbde
Peter M. Groen
Committed the ent...
|
613
614
615
616
617
618
619
|
}
std::vector<std::int32_t> ClientPaho::pendingOperations() const
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
std::vector<std::int32_t> retval{};
retval.resize(m_pendingOperations.size());
|
ca0cf29e
Steven
syntax fixes, con...
|
620
|
std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
|
51becbde
Peter M. Groen
Committed the ent...
|
621
622
623
624
625
626
627
628
629
|
return retval;
}
bool ClientPaho::hasPendingSubscriptions() const
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
return !m_pendingSubscriptions.empty();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
630
|
boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
631
632
633
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
boost::optional<bool> ret{};
|
ca0cf29e
Steven
syntax fixes, con...
|
634
635
|
auto cit = m_operationResult.find( token );
if( m_operationResult.end() != cit )
|
51becbde
Peter M. Groen
Committed the ent...
|
636
637
638
639
640
641
|
{
ret = cit->second;
}
return ret;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
642
|
void ClientPaho::parseEndpoint( const std::string& _endpoint )
|
51becbde
Peter M. Groen
Committed the ent...
|
643
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
644
645
|
auto ep = UriParser::parse( _endpoint );
if( ep.find( "user" ) != ep.end() )
|
51becbde
Peter M. Groen
Committed the ent...
|
646
647
648
649
650
|
{
m_username = ep["user"];
ep["user"].clear();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
651
|
if( ep.find( "password" ) != ep.end() )
|
51becbde
Peter M. Groen
Committed the ent...
|
652
653
654
655
|
{
m_password = ep["password"];
ep["password"].clear();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
656
|
m_endpoint = UriParser::toString( ep );
|
51becbde
Peter M. Groen
Committed the ent...
|
657
658
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
659
|
std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
660
661
662
663
664
665
666
667
668
669
670
671
672
|
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &ClientPaho::onPublishSuccess;
opts.onFailure = &ClientPaho::onPublishFailure;
opts.context = this;
auto msg = message.toAsyncMessage();
msg.qos = qos;
// Need to lock the mutex because it is possible that the callback is faster than
// the insertion of the token into the pending operations.
// OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
auto rc = MQTTAsync_sendMessage(m_client, message.topic().c_str(), &msg, &opts);
|
ca0cf29e
Steven
syntax fixes, con...
|
673
|
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
674
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
675
|
LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
676
677
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
678
|
if( !m_pendingOperations.insert( opts.token ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
679
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
680
|
LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
681
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
682
|
m_operationResult.erase( opts.token );
|
51becbde
Peter M. Groen
Committed the ent...
|
683
684
685
|
return opts.token;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
686
|
std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
687
688
689
690
691
692
693
694
695
696
697
698
|
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &ClientPaho::onSubscribeSuccess;
opts.onFailure = &ClientPaho::onSubscribeFailure;
opts.context = this;
// Need to lock the mutex because it is possible that the callback is faster than
// the insertion of the token into the pending operations.
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
auto rc = MQTTAsync_subscribe(m_client, topic.c_str(), qos, &opts);
if (MQTTASYNC_SUCCESS != rc)
{
|
ca0cf29e
Steven
syntax fixes, con...
|
699
700
|
m_pendingSubscriptions.erase( topic );
LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
701
702
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
703
|
if( !m_pendingOperations.insert( opts.token ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
704
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
705
|
LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
706
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
707
708
|
m_operationResult.erase( opts.token );
if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
709
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
710
|
LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
711
712
713
714
715
|
}
m_subscribeTokenToTopic[opts.token] = topic;
return opts.token;
}
|
0c424e03
Steven
syntax fixes. WIP
|
716
|
void ClientPaho::setConnectionStatus( ConnectionStatus status )
|
51becbde
Peter M. Groen
Committed the ent...
|
717
718
719
|
{
ConnectionStatus curStatus = m_connectionStatus;
m_connectionStatus = status;
|
0c424e03
Steven
syntax fixes. WIP
|
720
|
if( status != curStatus && m_connectionStatusCallback )
|
51becbde
Peter M. Groen
Committed the ent...
|
721
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
722
|
m_connectionStatusCallback( m_clientId, status );
|
51becbde
Peter M. Groen
Committed the ent...
|
723
724
725
|
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
726
|
bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
727
728
|
{
existingTopic.clear();
|
0c424e03
Steven
syntax fixes. WIP
|
729
|
for( const auto& s : m_pendingSubscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
730
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
731
|
if( testForOverlap( s.first, topic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
732
733
734
735
736
737
|
{
existingTopic = s.first;
return true;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
738
|
for( const auto& s : m_subscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
739
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
740
|
if( testForOverlap(s.first, topic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
|
{
existingTopic = s.first;
return true;
}
}
return false;
}
void ClientPaho::pushIncomingEvent(std::function<void()> ev)
{
m_callbackEventQueue.push(ev);
}
void ClientPaho::callbackEventHandler()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
756
|
LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
|
0c424e03
Steven
syntax fixes. WIP
|
757
758
|
for( ;; )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
759
|
std::vector<std::function<void()>> events;
|
0c424e03
Steven
syntax fixes. WIP
|
760
|
if( !m_callbackEventQueue.pop(events) )
|
51becbde
Peter M. Groen
Committed the ent...
|
761
762
763
764
|
{
break;
}
|
0c424e03
Steven
syntax fixes. WIP
|
765
|
for( const auto& ev : events )
|
51becbde
Peter M. Groen
Committed the ent...
|
766
767
|
{
ev();
|
51becbde
Peter M. Groen
Committed the ent...
|
768
769
770
771
|
}
}
// ("ClientPaho", "%1 - leaving callback event handler", m_clientId);
}
|
0c424e03
Steven
syntax fixes. WIP
|
772
|
void ClientPaho::onConnectOnInstance( const std::string& cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
773
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
774
|
(void) cause;
|
51becbde
Peter M. Groen
Committed the ent...
|
775
776
777
778
779
780
781
782
783
784
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
std::copy(m_subscriptions.begin(), m_subscriptions.end(), std::inserter(m_pendingSubscriptions, m_pendingSubscriptions.end()));
m_subscriptions.clear();
m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
}
setConnectionStatus(ConnectionStatus::Connected);
}
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
785
|
void ClientPaho::onConnectSuccessOnInstance()
|
51becbde
Peter M. Groen
Committed the ent...
|
786
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
787
788
789
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// Register the connect callback that is used in reconnect scenarios.
|
0c424e03
Steven
syntax fixes. WIP
|
790
791
|
auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
792
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
793
|
LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
794
|
}
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
795
|
|
51becbde
Peter M. Groen
Committed the ent...
|
796
797
798
799
800
801
|
// For MQTTV5
//rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
//if (MQTTASYNC_SUCCESS != rc) {
// // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
//}
// ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
802
|
|
51becbde
Peter M. Groen
Committed the ent...
|
803
804
805
|
m_operationResult[-100] = true;
m_pendingOperations.erase(-100);
}
|
9421324b
Peter M. Groen
First fix on conn...
|
806
|
|
0c424e03
Steven
syntax fixes. WIP
|
807
808
|
setConnectionStatus( ConnectionStatus::Connected );
if( m_connectPromise )
|
51becbde
Peter M. Groen
Committed the ent...
|
809
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
810
|
LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
811
812
813
814
815
|
m_connectPromise->set_value();
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
816
|
void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
817
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
818
819
|
(void) response;
LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
|
51becbde
Peter M. Groen
Committed the ent...
|
820
821
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
51becbde
Peter M. Groen
Committed the ent...
|
822
823
824
825
|
// ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
m_operationResult[-100] = false;
m_pendingOperations.erase(-100);
}
|
ca0cf29e
Steven
syntax fixes, con...
|
826
|
if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
827
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
828
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
829
830
831
832
833
834
835
836
837
|
}
m_operationsCompleteCV.notify_all();
}
//void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
//{
// MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
//}
|
ca0cf29e
Steven
syntax fixes, con...
|
838
|
void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
|
51becbde
Peter M. Groen
Committed the ent...
|
839
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
840
|
LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
841
842
843
844
845
846
847
848
849
850
851
852
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
m_subscriptions.clear();
m_pendingSubscriptions.clear();
m_subscribeTokenToTopic.clear();
m_unsubscribeTokenToTopic.clear();
// ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
m_operationResult[-200] = true;
m_pendingOperations.clear();
}
|
0c424e03
Steven
syntax fixes. WIP
|
853
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
854
|
|
0c424e03
Steven
syntax fixes. WIP
|
855
856
|
if( m_disconnectPromise )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
857
858
859
860
861
|
m_disconnectPromise->set_value();
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
862
|
void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
863
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
864
|
(void) response;
|
ca0cf29e
Steven
syntax fixes, con...
|
865
|
LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
866
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
867
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
51becbde
Peter M. Groen
Committed the ent...
|
868
869
870
871
872
|
// ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
m_operationResult[-200] = false;
m_pendingOperations.erase(-200);
}
|
0c424e03
Steven
syntax fixes. WIP
|
873
|
if( MQTTAsync_isConnected( m_client ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
874
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
875
|
setConnectionStatus( ConnectionStatus::Connected );
|
51becbde
Peter M. Groen
Committed the ent...
|
876
877
878
|
}
else
{
|
0c424e03
Steven
syntax fixes. WIP
|
879
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
880
881
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
882
|
if( m_disconnectPromise )
|
51becbde
Peter M. Groen
Committed the ent...
|
883
884
885
886
887
888
|
{
m_disconnectPromise->set_value();
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
889
|
void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
890
891
|
{
auto pd = response.publishData();
|
ca0cf29e
Steven
syntax fixes, con...
|
892
|
LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
893
894
895
896
897
898
899
900
901
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = true;
m_pendingOperations.erase(response.token());
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
902
|
void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
903
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
904
|
LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
905
906
907
908
909
910
911
912
913
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = false;
m_pendingOperations.erase(response.token());
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
914
|
void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
915
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
916
917
918
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
|
51becbde
Peter M. Groen
Committed the ent...
|
919
920
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
bool operationOk = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
921
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
|
51becbde
Peter M. Groen
Committed the ent...
|
922
923
924
|
{
// ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = operationOk;
|
ca0cf29e
Steven
syntax fixes, con...
|
925
|
m_pendingOperations.erase( response.token() );
|
51becbde
Peter M. Groen
Committed the ent...
|
926
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
927
928
|
auto it = m_subscribeTokenToTopic.find( response.token() );
if( m_subscribeTokenToTopic.end() == it )
|
0c424e03
Steven
syntax fixes. WIP
|
929
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
930
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
931
932
933
934
935
|
return;
}
auto topic = it->second;
m_subscribeTokenToTopic.erase(it);
|
ca0cf29e
Steven
syntax fixes, con...
|
936
937
|
auto pendingIt = m_pendingSubscriptions.find( topic );
if( m_pendingSubscriptions.end() == pendingIt )
|
51becbde
Peter M. Groen
Committed the ent...
|
938
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
939
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
940
941
|
return;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
942
|
if( response.qos() != pendingIt->second.qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
943
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
944
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
945
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
946
947
948
949
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
m_pendingSubscriptions.erase( pendingIt );
|
51becbde
Peter M. Groen
Committed the ent...
|
950
951
952
|
operationOk = true;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
953
|
void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
954
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
955
956
957
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
958
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
959
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
|
51becbde
Peter M. Groen
Committed the ent...
|
960
961
962
|
{
// MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
963
|
m_pendingOperations.erase( response.token() );
|
51becbde
Peter M. Groen
Committed the ent...
|
964
965
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
966
967
|
auto it = m_subscribeTokenToTopic.find( response.token() );
if( m_subscribeTokenToTopic.end() == it )
|
51becbde
Peter M. Groen
Committed the ent...
|
968
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
969
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
970
971
972
|
return;
}
auto topic = it->second;
|
ca0cf29e
Steven
syntax fixes, con...
|
973
|
m_subscribeTokenToTopic.erase( it );
|
51becbde
Peter M. Groen
Committed the ent...
|
974
|
|
ca0cf29e
Steven
syntax fixes, con...
|
975
976
|
auto pendingIt = m_pendingSubscriptions.find( topic );
if( m_pendingSubscriptions.end() == pendingIt )
|
51becbde
Peter M. Groen
Committed the ent...
|
977
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
978
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
979
980
|
return;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
981
982
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
m_pendingSubscriptions.erase( pendingIt );
|
51becbde
Peter M. Groen
Committed the ent...
|
983
984
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
985
|
void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
986
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
987
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
988
|
|
ca0cf29e
Steven
syntax fixes, con...
|
989
|
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
990
991
992
993
994
995
996
997
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
// As a workaround the last unsubscribe token is stored and is used when no valid token is available.
// This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
// before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
// sequentially (see ClientPaho::unsubscribe)!
auto token = response.token();
|
ca0cf29e
Steven
syntax fixes, con...
|
998
|
if( -1 == token )
|
51becbde
Peter M. Groen
Committed the ent...
|
999
1000
1001
1002
1003
1004
|
{
token = m_lastUnsubscribe;
m_lastUnsubscribe = -1;
}
bool operationOk = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
1005
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1006
1007
1008
|
{
// ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
m_operationResult[token] = operationOk;
|
ca0cf29e
Steven
syntax fixes, con...
|
1009
|
m_pendingOperations.erase( token );
|
51becbde
Peter M. Groen
Committed the ent...
|
1010
1011
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1012
1013
|
auto it = m_unsubscribeTokenToTopic.find( token );
if( m_unsubscribeTokenToTopic.end() == it )
|
51becbde
Peter M. Groen
Committed the ent...
|
1014
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1015
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1016
1017
1018
|
return;
}
auto topic = it->second;
|
ca0cf29e
Steven
syntax fixes, con...
|
1019
|
m_unsubscribeTokenToTopic.erase( it );
|
51becbde
Peter M. Groen
Committed the ent...
|
1020
|
|
ca0cf29e
Steven
syntax fixes, con...
|
1021
1022
1023
1024
|
auto registeredIt = m_subscriptions.find( topic );
if( m_subscriptions.end() == registeredIt )
{
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1025
1026
|
return;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1027
1028
1029
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
m_subscriptions.erase( registeredIt );
|
51becbde
Peter M. Groen
Committed the ent...
|
1030
1031
1032
|
operationOk = true;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1033
|
void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1034
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1035
1036
|
LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1037
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
1038
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1039
1040
1041
|
{
// ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
1042
|
m_pendingOperations.erase( response.token() );
|
51becbde
Peter M. Groen
Committed the ent...
|
1043
1044
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1045
1046
|
auto it = m_unsubscribeTokenToTopic.find( response.token() );
if( m_unsubscribeTokenToTopic.end() == it )
|
51becbde
Peter M. Groen
Committed the ent...
|
1047
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1048
|
LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1049
1050
1051
|
return;
}
auto topic = it->second;
|
ca0cf29e
Steven
syntax fixes, con...
|
1052
|
m_unsubscribeTokenToTopic.erase( it );
|
51becbde
Peter M. Groen
Committed the ent...
|
1053
1054
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1055
|
int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
|
51becbde
Peter M. Groen
Committed the ent...
|
1056
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1057
|
LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1058
|
|
ca0cf29e
Steven
syntax fixes, con...
|
1059
|
std::function<void( MqttMessage )> cb;
|
51becbde
Peter M. Groen
Committed the ent...
|
1060
1061
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
1062
|
for( const auto& s : m_subscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
1063
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1064
|
if( boost::regex_match( message.topic(), s.second.topicRegex ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
1065
1066
1067
1068
1069
1070
|
{
cb = s.second.callback;
}
}
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1071
|
if( cb )
|
51becbde
Peter M. Groen
Committed the ent...
|
1072
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1073
|
cb( message );
|
51becbde
Peter M. Groen
Committed the ent...
|
1074
1075
1076
|
}
else
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1077
|
LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1078
1079
1080
1081
|
}
return 1;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1082
|
void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
|
51becbde
Peter M. Groen
Committed the ent...
|
1083
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1084
1085
|
LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
if( m_deliveryCompleteCallback )
|
51becbde
Peter M. Groen
Committed the ent...
|
1086
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1087
|
m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1088
1089
1090
|
}
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1091
|
void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1092
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1093
|
(void) cause;
|
51becbde
Peter M. Groen
Committed the ent...
|
1094
|
// ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
|
ca0cf29e
Steven
syntax fixes, con...
|
1095
|
setConnectionStatus( ConnectionStatus::ReconnectInProgress );
|
51becbde
Peter M. Groen
Committed the ent...
|
1096
1097
1098
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// Remove all tokens related to subscriptions from the active operations.
|
ca0cf29e
Steven
syntax fixes, con...
|
1099
|
for( const auto& p : m_subscribeTokenToTopic )
|
51becbde
Peter M. Groen
Committed the ent...
|
1100
1101
|
{
// ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
|
ca0cf29e
Steven
syntax fixes, con...
|
1102
|
m_pendingOperations.erase( p.first );
|
51becbde
Peter M. Groen
Committed the ent...
|
1103
1104
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1105
|
for( const auto& p : m_unsubscribeTokenToTopic )
|
51becbde
Peter M. Groen
Committed the ent...
|
1106
1107
|
{
// ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
|
ca0cf29e
Steven
syntax fixes, con...
|
1108
|
m_pendingOperations.erase( p.first );
|
51becbde
Peter M. Groen
Committed the ent...
|
1109
1110
1111
1112
1113
1114
1115
|
}
// Clear the administration used in the subscribe process.
m_subscribeTokenToTopic.clear();
m_unsubscribeTokenToTopic.clear();
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1116
|
void ClientPaho::onFirstConnect( void* context, char* cause )
|
9421324b
Peter M. Groen
First fix on conn...
|
1117
1118
|
{
LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1119
|
if( context )
|
9421324b
Peter M. Groen
First fix on conn...
|
1120
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1121
1122
1123
|
auto *cl = reinterpret_cast<ClientPaho*>( context );
std::string reason( nullptr == cause ? "Unknown cause" : cause );
cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
|
9421324b
Peter M. Groen
First fix on conn...
|
1124
1125
1126
|
}
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1127
|
void ClientPaho::onConnect( void* context, char* cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1128
|
{
|
9421324b
Peter M. Groen
First fix on conn...
|
1129
|
LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1130
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1131
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1132
1133
1134
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
std::string reason( nullptr == cause ? "unknown cause" : cause );
cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1135
1136
1137
1138
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1139
|
void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1140
|
{
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
1141
|
LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1142
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1143
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1144
1145
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
if( !response )
|
76d01373
Peter M. Groen
Fix on connection
|
1146
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1147
|
// connect should always have a valid response struct.
|
ca0cf29e
Steven
syntax fixes, con...
|
1148
|
LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
1149
|
return;
|
51becbde
Peter M. Groen
Committed the ent...
|
1150
|
}
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
1151
|
// MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
|
ca0cf29e
Steven
syntax fixes, con...
|
1152
|
cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1153
1154
1155
1156
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1157
|
void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1158
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1159
1160
|
LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1161
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1162
1163
1164
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
|
}
}
//// static
//void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
//{
// apply_unused_parameters(properties);
// try {
// if (context) {
// auto* cl = reinterpret_cast<ClientPaho*>(context);
// cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
// }
// }
// catch (...) {
// }
// catch (const std::exception& e) {
// MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
// }
// catch (...) {
// MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
// }
//}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1189
|
void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1190
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1191
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1192
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1193
1194
1195
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttSuccess resp( response ? response->token : 0 );
cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1196
1197
1198
1199
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1200
|
void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1201
|
{
|
9421324b
Peter M. Groen
First fix on conn...
|
1202
|
LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1203
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1204
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1205
1206
1207
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1208
1209
1210
1211
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1212
|
void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1213
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1214
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1215
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1216
1217
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
if( !response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
|
{
// publish should always have a valid response struct.
// toLogFile ("ClientPaho", "onPublishSuccess - no response data");
}
MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1228
|
void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1229
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1230
1231
|
(void) response;
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1232
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1233
1234
1235
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1236
1237
1238
1239
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1240
|
void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1241
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1242
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1243
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1244
1245
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
if( !response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1246
1247
1248
1249
|
{
// subscribe should always have a valid response struct.
// MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1250
1251
|
MqttSuccess resp( response->token, response->alt.qos );
cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1252
1253
1254
1255
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1256
|
void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1257
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1258
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1259
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1260
1261
1262
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1263
1264
1265
1266
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1267
|
void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1268
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1269
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1270
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1271
1272
1273
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttSuccess resp( response ? response->token : -1 );
cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1274
1275
1276
1277
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1278
|
void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1279
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1280
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1281
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1282
1283
1284
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1285
1286
1287
1288
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1289
|
int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
|
51becbde
Peter M. Groen
Committed the ent...
|
1290
1291
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1292
|
OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1293
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1294
1295
|
MQTTAsync_freeMessage( &message );
MQTTAsync_free( topicName );
|
51becbde
Peter M. Groen
Committed the ent...
|
1296
1297
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1298
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1299
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1300
1301
1302
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttMessage msg( topicName, *message );
cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1303
1304
1305
1306
1307
1308
|
}
return 1; // always return true. Otherwise this callback is triggered again.
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1309
|
void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
|
51becbde
Peter M. Groen
Committed the ent...
|
1310
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1311
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1312
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1313
1314
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1315
1316
1317
1318
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1319
|
void ClientPaho::onConnectionLost( void* context, char* cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1320
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1321
|
OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1322
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1323
|
if( cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1324
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1325
|
MQTTAsync_free( cause );
|
51becbde
Peter M. Groen
Committed the ent...
|
1326
1327
1328
|
}
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1329
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1330
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1331
1332
1333
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
std::string msg( nullptr == cause ? "cause unknown" : cause );
cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1334
1335
1336
1337
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1338
|
void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
|
51becbde
Peter M. Groen
Committed the ent...
|
1339
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1340
1341
|
(void) message;
switch( level )
|
51becbde
Peter M. Groen
Committed the ent...
|
1342
1343
1344
|
{
case MQTTASYNC_TRACE_MAXIMUM:
case MQTTASYNC_TRACE_MEDIUM:
|
ca0cf29e
Steven
syntax fixes, con...
|
1345
1346
|
case MQTTASYNC_TRACE_MINIMUM:
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1347
1348
1349
|
// ("ClientPaho", "paho - %1", message)
break;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1350
1351
|
case MQTTASYNC_TRACE_PROTOCOL:
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1352
1353
1354
1355
1356
|
// ("ClientPaho", "paho - %1", message)
break;
}
case MQTTASYNC_TRACE_ERROR:
case MQTTASYNC_TRACE_SEVERE:
|
ca0cf29e
Steven
syntax fixes, con...
|
1357
1358
|
case MQTTASYNC_TRACE_FATAL:
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1359
1360
1361
1362
1363
|
// ("ClientPaho", "paho - %1", message)
break;
}
}
}
|