b5d9e433
Peter M. Groen
Fixed License Hea...
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
/* ****************************************************************************
* Copyright 2019 Open Systems Development BV *
* *
* Permission is hereby granted, free of charge, to any person obtaining a *
* copy of this software and associated documentation files (the "Software"), *
* to deal in the Software without restriction, including without limitation *
* the rights to use, copy, modify, merge, publish, distribute, sublicense, *
* and/or sell copies of the Software, and to permit persons to whom the *
* Software is furnished to do so, subject to the following conditions: *
* *
* The above copyright notice and this permission notice shall be included in *
* all copies or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR *
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *
* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER *
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *
* DEALINGS IN THE SOFTWARE. *
* ***************************************************************************/
|
51becbde
Peter M. Groen
Committed the ent...
|
22
23
24
25
26
|
#include "clientpaho.h"
#include "errorcode.h"
#include "mqttutil.h"
#include "lockguard.h"
|
9421324b
Peter M. Groen
First fix on conn...
|
27
|
#include "log.h"
|
51becbde
Peter M. Groen
Committed the ent...
|
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
#include "metaprogrammingdefs.h"
#include "mqttstream.h"
#include "scopeguard.h"
#include "uriparser.h"
// std::chrono
#include "compat-chrono.h"
// std
#include <algorithm>
#include <iterator>
using namespace osdev::components::mqtt;
namespace {
#if defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-template"
#endif
OSDEV_COMPONENTS_HASMEMBER_TRAIT(onSuccess5)
template <typename TRet>
inline typename std::enable_if<!has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
{
return MQTTAsync_disconnectOptions_initializer;
}
template <typename TRet>
inline typename std::enable_if<has_onSuccess5<TRet>::value, TRet>::type initializeMqttStruct(TRet*)
{
// For some reason g++ on centos7 evaluates the function body even when it is discarded by SFINAE.
// This leads to a compile error on an undefined symbol. We will use the old initializer macro, but this
// method should not be chosen when the struct does not contain member onSuccess5!
// On yocto warrior mqtt-paho-c 1.3.0 the macro MQTTAsync_disconnectOptions_initializer5 is not defined.
// while the struct does have an onSuccess5 member. In that case we do need correct initializer code.
// We fall back to the MQTTAsync_disconnectOptions_initializer macro and initialize
// additional fields ourself (which unfortunately results in a pesky compiler warning about missing field initializers).
#ifndef MQTTAsync_disconnectOptions_initializer5
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmissing-field-initializers"
TRet ret = MQTTAsync_disconnectOptions_initializer;
ret.struct_version = 1;
ret.onSuccess5 = nullptr;
ret.onFailure5 = nullptr;
return ret;
#pragma GCC diagnostic pop
#else
return MQTTAsync_disconnectOptions_initializer5;
#endif
}
template <typename TRet>
struct Init
{
static TRet initialize()
{
return initializeMqttStruct<TRet>(static_cast<TRet*>(nullptr));
}
};
#if defined(__clang__)
#pragma GCC diagnostic pop
#endif
} // namespace
std::atomic_int ClientPaho::s_numberOfInstances(0);
|
ca0cf29e
Steven
syntax fixes, con...
|
97
|
ClientPaho::ClientPaho( const std::string& _endpoint,
|
51becbde
Peter M. Groen
Committed the ent...
|
98
|
const std::string& _id,
|
ca0cf29e
Steven
syntax fixes, con...
|
99
100
|
const std::function<void( const std::string&, ConnectionStatus )>& connectionStatusCallback,
const std::function<void( const std::string& clientId, std::int32_t pubMsgToken )>& deliveryCompleteCallback )
|
51becbde
Peter M. Groen
Committed the ent...
|
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
: m_mutex()
, m_endpoint()
, m_username()
, m_password()
, m_clientId(_id)
, m_pendingOperations()
, m_operationResult()
, m_operationsCompleteCV()
, m_subscriptions()
, m_pendingSubscriptions()
, m_subscribeTokenToTopic()
, m_unsubscribeTokenToTopic()
, m_pendingPublishes()
, m_processPendingPublishes(false)
, m_pendingPublishesReadyCV()
, m_client()
, m_connectionStatus(ConnectionStatus::Disconnected)
, m_connectionStatusCallback(connectionStatusCallback)
, m_deliveryCompleteCallback(deliveryCompleteCallback)
, m_lastUnsubscribe(-1)
, m_connectPromise()
, m_disconnectPromise()
, m_callbackEventQueue(m_clientId)
, m_workerThread()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
126
|
if( 0 == s_numberOfInstances++ )
|
76d01373
Peter M. Groen
Fix on connection
|
127
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
128
|
MQTTAsync_setTraceCallback( &ClientPaho::onLogPaho );
|
51becbde
Peter M. Groen
Committed the ent...
|
129
|
}
|
76d01373
Peter M. Groen
Fix on connection
|
130
|
|
ca0cf29e
Steven
syntax fixes, con...
|
131
|
LogDebug( "[ClientPaho::ClientPaho]", std::string( " " + m_clientId + " - ctor ClientPaho " ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
132
|
parseEndpoint(_endpoint);
|
76d01373
Peter M. Groen
Fix on connection
|
133
|
|
ca0cf29e
Steven
syntax fixes, con...
|
134
135
|
auto rc = MQTTAsync_create( &m_client, m_endpoint.c_str(), m_clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr );
if( MQTTASYNC_SUCCESS == rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
136
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
137
138
|
MQTTAsync_setCallbacks( m_client, reinterpret_cast<void*>(this), ClientPaho::onConnectionLost, ClientPaho::onMessageArrived, ClientPaho::onDeliveryComplete );
m_workerThread = std::thread( &ClientPaho::callbackEventHandler, this );
|
51becbde
Peter M. Groen
Committed the ent...
|
139
140
141
|
}
else
{
|
76d01373
Peter M. Groen
Fix on connection
|
142
|
LogError( "[ClientPaho::ClientPaho]", std::string( m_clientId + " - Failed to create client for endpoint " + m_endpoint + ", return code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
143
144
145
146
147
|
}
}
ClientPaho::~ClientPaho()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
148
|
LogDebug( "[ClientPaho::~ClientPaho]", std::string( m_clientId + " - destructor ClientPaho" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
149
150
151
152
|
if( MQTTAsync_isConnected( m_client ) )
{
this->unsubscribeAll();
|
ca0cf29e
Steven
syntax fixes, con...
|
153
154
|
this->waitForCompletion( std::chrono::milliseconds(2000), std::set<int32_t>{} );
this->disconnect( true, 5000 );
|
51becbde
Peter M. Groen
Committed the ent...
|
155
156
157
158
|
}
else
{
// If the status was already disconnected this call does nothing
|
ca0cf29e
Steven
syntax fixes, con...
|
159
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
160
161
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
162
|
if( 0 == --s_numberOfInstances )
|
51becbde
Peter M. Groen
Committed the ent...
|
163
164
165
166
167
|
{
// encountered a case where termination of the logging system within paho led to a segfault.
// This was a paho thread that was cleaned while at the same time the logging system was terminated.
// Removing the trace callback will not solve the underlying problem but hopefully will trigger it less
// frequently.
|
ca0cf29e
Steven
syntax fixes, con...
|
168
|
MQTTAsync_setTraceCallback( nullptr );
|
51becbde
Peter M. Groen
Committed the ent...
|
169
170
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
171
|
MQTTAsync_destroy( &m_client );
|
51becbde
Peter M. Groen
Committed the ent...
|
172
173
|
m_callbackEventQueue.stop();
|
ca0cf29e
Steven
syntax fixes, con...
|
174
|
if( m_workerThread.joinable() )
|
51becbde
Peter M. Groen
Committed the ent...
|
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
{
m_workerThread.join();
}
}
std::string ClientPaho::clientId() const
{
return m_clientId;
}
ConnectionStatus ClientPaho::connectionStatus() const
{
return m_connectionStatus;
}
|
31eece9b
Steven
added LWT ( last ...
|
190
|
std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
|
51becbde
Peter M. Groen
Committed the ent...
|
191
192
193
|
{
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
2b967ea7
Steven
promise is not wo...
|
194
|
if( ConnectionStatus::Disconnected != m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
195
196
197
|
{
return -1;
}
|
2b967ea7
Steven
promise is not wo...
|
198
|
setConnectionStatus( ConnectionStatus::ConnectInProgress );
|
51becbde
Peter M. Groen
Committed the ent...
|
199
200
|
}
|
76d01373
Peter M. Groen
Fix on connection
|
201
202
|
LogInfo( "[ClientPaho::connect]", std::string( m_clientId + " - start connect to endpoint " + m_endpoint ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
203
|
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
|
9421324b
Peter M. Groen
First fix on conn...
|
204
|
conn_opts.keepAliveInterval = 5;
|
51becbde
Peter M. Groen
Committed the ent...
|
205
|
conn_opts.cleansession = 1;
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
206
|
conn_opts.onSuccess = nullptr;
|
51becbde
Peter M. Groen
Committed the ent...
|
207
208
209
|
conn_opts.onFailure = &ClientPaho::onConnectFailure;
conn_opts.context = this;
conn_opts.automaticReconnect = 1;
|
31eece9b
Steven
added LWT ( last ...
|
210
|
|
76d01373
Peter M. Groen
Fix on connection
|
211
212
213
214
|
// Make sure we get a signal if the promise is fulfilled
auto ccb = MQTTAsync_setConnected( m_client, reinterpret_cast<void*>(this), ClientPaho::onFirstConnect );
if( MQTTASYNC_SUCCESS == ccb )
{
|
ca0cf29e
Steven
syntax fixes, con...
|
215
|
LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback SUCCEEDED.") );
|
76d01373
Peter M. Groen
Fix on connection
|
216
217
218
|
}
else
{
|
ca0cf29e
Steven
syntax fixes, con...
|
219
|
LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Setting the extra onConnected callback FAILED.") );
|
76d01373
Peter M. Groen
Fix on connection
|
220
221
222
|
}
// Setup the last will and testament, if so desired.
|
31eece9b
Steven
added LWT ( last ...
|
223
224
225
226
227
228
229
|
if( !lwt.topic().empty() )
{
MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
will_opts.message = lwt.message().c_str();
will_opts.topicName = lwt.topic().c_str();
conn_opts.will = &will_opts;
|
76d01373
Peter M. Groen
Fix on connection
|
230
231
|
LogDebug( "[ClientPaho::connect]", std::string( m_clientId + " - Set Last will and testament. Topic : " + lwt.topic() + " => Message : " + lwt.message() ) );
|
31eece9b
Steven
added LWT ( last ...
|
232
233
234
235
236
237
|
}
else
{
conn_opts.will = nullptr;
}
|
2b967ea7
Steven
promise is not wo...
|
238
|
if( !m_username.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
239
240
241
242
|
{
conn_opts.username = m_username.c_str();
}
|
2b967ea7
Steven
promise is not wo...
|
243
|
if( !m_password.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
244
245
246
247
248
249
250
|
{
conn_opts.password = m_password.c_str();
}
std::promise<void> waitForConnectPromise{};
auto waitForConnect = waitForConnectPromise.get_future();
m_connectPromise.reset();
|
2b967ea7
Steven
promise is not wo...
|
251
|
if( wait )
|
51becbde
Peter M. Groen
Committed the ent...
|
252
|
{
|
2b967ea7
Steven
promise is not wo...
|
253
|
m_connectPromise = std::make_unique<std::promise<void>>( std::move( waitForConnectPromise ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
254
255
256
|
}
{
|
2b967ea7
Steven
promise is not wo...
|
257
258
|
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
if( !m_pendingOperations.insert( -100 ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
259
260
261
|
{
// Write something
}
|
2b967ea7
Steven
promise is not wo...
|
262
|
m_operationResult.erase( -100 );
|
51becbde
Peter M. Groen
Committed the ent...
|
263
264
|
}
|
2b967ea7
Steven
promise is not wo...
|
265
266
|
int rc = MQTTAsync_connect( m_client, &conn_opts );
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
267
|
{
|
2b967ea7
Steven
promise is not wo...
|
268
269
|
setConnectionStatus( ConnectionStatus::Disconnected );
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
|
51becbde
Peter M. Groen
Committed the ent...
|
270
271
272
273
|
m_operationResult[-100] = false;
m_pendingOperations.erase(-100);
}
|
2b967ea7
Steven
promise is not wo...
|
274
|
if( wait )
|
51becbde
Peter M. Groen
Committed the ent...
|
275
276
277
278
279
280
281
|
{
waitForConnect.get();
m_connectPromise.reset();
}
return -100;
}
|
0c424e03
Steven
syntax fixes. WIP
|
282
|
std::int32_t ClientPaho::disconnect( bool wait, int timeoutMs )
|
51becbde
Peter M. Groen
Committed the ent...
|
283
284
285
286
287
|
{
ConnectionStatus currentStatus = m_connectionStatus;
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
0c424e03
Steven
syntax fixes. WIP
|
288
289
|
if( ConnectionStatus::Disconnected == m_connectionStatus || ConnectionStatus::DisconnectInProgress == m_connectionStatus )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
290
291
292
293
|
return -1;
}
currentStatus = m_connectionStatus;
|
0c424e03
Steven
syntax fixes. WIP
|
294
|
setConnectionStatus( ConnectionStatus::DisconnectInProgress );
|
51becbde
Peter M. Groen
Committed the ent...
|
295
296
297
298
299
300
301
302
303
304
305
|
}
MQTTAsync_disconnectOptions disconn_opts = Init<MQTTAsync_disconnectOptions>::initialize();
disconn_opts.timeout = timeoutMs;
disconn_opts.onSuccess = &ClientPaho::onDisconnectSuccess;
disconn_opts.onFailure = &ClientPaho::onDisconnectFailure;
disconn_opts.context = this;
std::promise<void> waitForDisconnectPromise{};
auto waitForDisconnect = waitForDisconnectPromise.get_future();
m_disconnectPromise.reset();
|
0c424e03
Steven
syntax fixes. WIP
|
306
307
|
if( wait )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
308
309
310
311
312
|
m_disconnectPromise = std::make_unique<std::promise<void>>(std::move(waitForDisconnectPromise));
}
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
313
|
if( !m_pendingOperations.insert( -200 ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
314
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
315
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " disconnect - token" + std::to_string( -200 ) + "already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
316
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
317
|
m_operationResult.erase( -200 );
|
51becbde
Peter M. Groen
Committed the ent...
|
318
319
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
320
|
int rc = MQTTAsync_disconnect( m_client, &disconn_opts );
|
0c424e03
Steven
syntax fixes. WIP
|
321
|
if( MQTTASYNC_SUCCESS != rc )
|
76d01373
Peter M. Groen
Fix on connection
|
322
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
323
|
if( MQTTASYNC_DISCONNECTED == rc )
|
76d01373
Peter M. Groen
Fix on connection
|
324
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
325
326
327
|
currentStatus = ConnectionStatus::Disconnected;
}
|
0c424e03
Steven
syntax fixes. WIP
|
328
329
|
setConnectionStatus( currentStatus );
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
|
51becbde
Peter M. Groen
Committed the ent...
|
330
|
m_operationResult[-200] = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
331
|
m_pendingOperations.erase( -200 );
|
51becbde
Peter M. Groen
Committed the ent...
|
332
|
|
0c424e03
Steven
syntax fixes. WIP
|
333
|
if( MQTTASYNC_DISCONNECTED == rc )
|
2b967ea7
Steven
promise is not wo...
|
334
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
335
336
|
return -1;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
337
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - failed to disconnect - return code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
338
339
|
}
|
2b967ea7
Steven
promise is not wo...
|
340
341
|
if( wait )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
342
343
|
if (std::future_status::timeout == waitForDisconnect.wait_for(std::chrono::milliseconds(timeoutMs + 100)))
{
|
ca0cf29e
Steven
syntax fixes, con...
|
344
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - timeout occurred on disconnect" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
345
346
347
348
349
350
351
|
}
waitForDisconnect.get();
m_disconnectPromise.reset();
}
return -200;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
352
|
std::int32_t ClientPaho::publish( const MqttMessage& message, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
353
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
354
|
if( ConnectionStatus::DisconnectInProgress == m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
355
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
356
|
LogDebug( "[ClientPaho::publish]", std::string( m_clientId + " - disconnect in progress, ignoring publish with qos " + std::to_string( qos ) + " on topic " + message.topic() ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
357
358
|
return -1;
}
|
0c424e03
Steven
syntax fixes. WIP
|
359
|
else if( ConnectionStatus::Disconnected == m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
360
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
361
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - unable to publish, not connected" ) );
|
2b967ea7
Steven
promise is not wo...
|
362
|
connect( true );
|
51becbde
Peter M. Groen
Committed the ent...
|
363
364
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
365
|
if( !isValidTopic(message.topic() ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
366
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
367
|
LogDebug( "[ClientPaho::disconnect]", std::string( m_clientId + " - topic " + message.topic() + " is invalid" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
368
369
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
370
|
if( qos > 2 )
|
51becbde
Peter M. Groen
Committed the ent...
|
371
372
373
|
{
qos = 2;
}
|
0c424e03
Steven
syntax fixes. WIP
|
374
|
else if( qos < 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
375
376
377
378
|
{
qos = 0;
}
|
51becbde
Peter M. Groen
Committed the ent...
|
379
|
std::unique_lock<std::mutex> lck(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
380
|
if( ConnectionStatus::ReconnectInProgress == m_connectionStatus || m_processPendingPublishes )
|
a670240b
Peter M. Groen
Fix on connection
|
381
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
382
|
m_pendingPublishesReadyCV.wait(lck, [this]() { return !m_processPendingPublishes; });
|
ca0cf29e
Steven
syntax fixes, con...
|
383
|
if( ConnectionStatus::ReconnectInProgress == m_connectionStatus )
|
a670240b
Peter M. Groen
Fix on connection
|
384
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
385
|
LogDebug( "[ClientPaho::publish]", "Adding publish to pending queue." );
|
ca0cf29e
Steven
syntax fixes, con...
|
386
|
m_pendingPublishes.push_front( Publish{ qos, message } );
|
51becbde
Peter M. Groen
Committed the ent...
|
387
388
389
390
|
return -1;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
391
|
return publishInternal( message, qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
392
393
394
395
396
397
|
}
void ClientPaho::publishPending()
{
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
0c424e03
Steven
syntax fixes. WIP
|
398
|
if( !m_processPendingPublishes )
|
2b967ea7
Steven
promise is not wo...
|
399
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
400
401
402
403
|
return;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
404
|
if( ConnectionStatus::Connected != m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
405
|
{
|
a670240b
Peter M. Groen
Fix on connection
|
406
|
LogInfo( "[ClientPaho::publishPending]", std::string( m_clientId + " - " ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
407
408
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
409
|
while( !m_pendingPublishes.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
410
411
|
{
const auto& pub = m_pendingPublishes.back();
|
ca0cf29e
Steven
syntax fixes, con...
|
412
|
publishInternal( pub.data, pub.qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
413
414
415
416
417
418
419
420
421
422
423
|
m_pendingPublishes.pop_back();
}
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
m_processPendingPublishes = false;
}
m_pendingPublishesReadyCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
424
|
std::int32_t ClientPaho::subscribe( const std::string& topic, int qos, const std::function<void(MqttMessage msg)>& cb )
|
51becbde
Peter M. Groen
Committed the ent...
|
425
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
426
|
if( ConnectionStatus::Connected != m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
427
|
{
|
11fe0b09
Peter M. Groen
Rework for subscr...
|
428
|
LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Client not connected..." ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
429
430
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
431
|
if( !isValidTopic( topic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
432
433
|
{
// ("ClientPaho", "%1 - topic %2 is invalid", m_clientId, topic);
|
11fe0b09
Peter M. Groen
Rework for subscr...
|
434
435
|
LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Topic " + topic + " is invalid." ) );
return -1;
|
51becbde
Peter M. Groen
Committed the ent...
|
436
437
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
438
|
if( qos > 2 )
|
51becbde
Peter M. Groen
Committed the ent...
|
439
440
441
|
{
qos = 2;
}
|
0c424e03
Steven
syntax fixes. WIP
|
442
|
else if( qos < 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
443
444
445
446
447
448
449
450
|
{
qos = 0;
}
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
auto itExisting = m_subscriptions.find(topic);
|
0c424e03
Steven
syntax fixes. WIP
|
451
452
453
454
|
if( m_subscriptions.end() != itExisting )
{
if( itExisting->second.qos == qos )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
455
456
457
458
459
460
|
return -1;
}
// (OverlappingTopicException, "existing subscription with same topic, but different qos", topic);
}
auto itPending = m_pendingSubscriptions.find(topic);
|
0c424e03
Steven
syntax fixes. WIP
|
461
462
463
464
465
466
467
|
if( m_pendingSubscriptions.end() != itPending )
{
if( itPending->second.qos == qos )
{
auto itToken = std::find_if( m_subscribeTokenToTopic.begin(), m_subscribeTokenToTopic.end(), [&topic](const std::pair<MQTTAsync_token, std::string>& item) { return topic == item.second; } );
if( m_subscribeTokenToTopic.end() != itToken )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
468
469
|
return itToken->first;
}
|
0c424e03
Steven
syntax fixes. WIP
|
470
471
|
else
{
|
51becbde
Peter M. Groen
Committed the ent...
|
472
473
474
475
476
477
478
|
return -1;
}
}
// (OverlappingTopicException, "pending subscription with same topic, but different qos", topic);
}
std::string existingTopic{};
|
0c424e03
Steven
syntax fixes. WIP
|
479
|
if( isOverlappingInternal( topic, existingTopic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
480
481
|
{
// (OverlappingTopicException, "overlapping topic", existingTopic, topic);
|
11fe0b09
Peter M. Groen
Rework for subscr...
|
482
|
LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " - Overlapping topic : Existing Topic : " + existingTopic + " => New Topic : " + topic ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
483
484
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
485
486
|
LogDebug( "[ClientPaho::subscribe]", std::string( m_clientId + " -adding subscription on topic " + topic + " to the pending subscriptions" ) );
m_pendingSubscriptions.emplace( std::make_pair( topic, Subscription{ qos, boost::regex( convertTopicToRegex( topic ) ), cb } ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
487
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
488
|
return subscribeInternal( topic, qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
489
490
491
492
|
}
void ClientPaho::resubscribe()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
493
|
decltype( m_pendingSubscriptions ) pendingSubscriptions{};
|
51becbde
Peter M. Groen
Committed the ent...
|
494
495
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
496
|
std::copy( m_pendingSubscriptions.begin(), m_pendingSubscriptions.end(), std::inserter(pendingSubscriptions, pendingSubscriptions.end() ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
497
498
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
499
|
for( const auto& s : pendingSubscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
500
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
501
|
subscribeInternal( s.first, s.second.qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
502
503
504
|
}
}
|
31eece9b
Steven
added LWT ( last ...
|
505
|
std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
506
507
|
{
{
|
0c424e03
Steven
syntax fixes. WIP
|
508
|
OSDEV_COMPONENTS_LOCKGUARD( m_mutex );
|
51becbde
Peter M. Groen
Committed the ent...
|
509
|
bool found = false;
|
0c424e03
Steven
syntax fixes. WIP
|
510
|
for( const auto& s : m_subscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
511
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
512
|
if( topic == s.first && qos == s.second.qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
513
514
515
516
517
|
{
found = true;
break;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
518
|
if( !found )
|
51becbde
Peter M. Groen
Committed the ent...
|
519
520
521
522
523
524
525
526
527
528
529
530
531
532
|
{
return -1;
}
}
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &ClientPaho::onUnsubscribeSuccess;
opts.onFailure = &ClientPaho::onUnsubscribeFailure;
opts.context = this;
{
// Need to lock the mutex because it is possible that the callback is faster than
// the insertion of the token into the pending operations.
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
533
|
auto rc = MQTTAsync_unsubscribe( m_client, topic.c_str(), &opts );
|
0c424e03
Steven
syntax fixes. WIP
|
534
|
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
535
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
536
|
LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - unsubscribe on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
537
538
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
539
|
if( !m_pendingOperations.insert( opts.token ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
540
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
541
|
LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " unsubscribe - token " + std::to_string( opts.token ) + " already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
542
543
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
544
545
|
m_operationResult.erase( opts.token );
if( m_unsubscribeTokenToTopic.count( opts.token ) > 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
546
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
547
|
LogDebug( "[ClientPaho::unsubscribe]", std::string( m_clientId + " - token already in use, replacing unsubscribe from topic " + m_unsubscribeTokenToTopic[opts.token] + " with " + topic ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
548
549
550
551
552
553
554
555
556
557
558
559
560
|
}
m_lastUnsubscribe = opts.token; // centos7 workaround
m_unsubscribeTokenToTopic[opts.token] = topic;
}
// Because of a bug in paho-c on centos7 the unsubscribes need to be sequential (best effort).
this->waitForCompletion(std::chrono::seconds(1), std::set<int32_t>{ opts.token });
return opts.token;
}
void ClientPaho::unsubscribeAll()
{
|
0c424e03
Steven
syntax fixes. WIP
|
561
|
decltype( m_subscriptions ) subscriptions{};
|
51becbde
Peter M. Groen
Committed the ent...
|
562
563
564
565
566
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
subscriptions = m_subscriptions;
}
|
0c424e03
Steven
syntax fixes. WIP
|
567
568
|
for( const auto& s : subscriptions )
{
|
ca0cf29e
Steven
syntax fixes, con...
|
569
|
this->unsubscribe( s.first, s.second.qos );
|
51becbde
Peter M. Groen
Committed the ent...
|
570
571
572
573
574
|
}
}
std::chrono::milliseconds ClientPaho::waitForCompletion(std::chrono::milliseconds waitFor, const std::set<std::int32_t>& tokens) const
{
|
ca0cf29e
Steven
syntax fixes, con...
|
575
576
577
|
if( waitFor <= std::chrono::milliseconds( 0 ) )
{
return std::chrono::milliseconds( 0 );
|
51becbde
Peter M. Groen
Committed the ent...
|
578
579
580
|
}
std::chrono::milliseconds timeElapsed{};
{
|
ca0cf29e
Steven
syntax fixes, con...
|
581
|
osdev::components::mqtt::measurement::TimeMeasurement msr( "waitForCompletion", [&timeElapsed](const std::string&, std::chrono::steady_clock::time_point, std::chrono::microseconds sinceStart, std::chrono::microseconds )
|
51becbde
Peter M. Groen
Committed the ent...
|
582
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
583
|
timeElapsed = std::chrono::ceil<std::chrono::milliseconds>( sinceStart );
|
51becbde
Peter M. Groen
Committed the ent...
|
584
585
|
});
std::unique_lock<std::mutex> lck(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
586
|
|
51becbde
Peter M. Groen
Committed the ent...
|
587
588
589
|
// ("ClientPaho", "%1 waitForCompletion - pending operations : %2", m_clientId, m_pendingOperations);
m_operationsCompleteCV.wait_for(lck, waitFor, [this, &tokens]()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
590
|
if( tokens.empty() )
|
51becbde
Peter M. Groen
Committed the ent...
|
591
592
593
|
{ // wait for all operations to end
return m_pendingOperations.empty();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
594
|
else if( tokens.size() == 1 )
|
51becbde
Peter M. Groen
Committed the ent...
|
595
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
596
|
return m_pendingOperations.find( *tokens.cbegin() ) == m_pendingOperations.end();
|
51becbde
Peter M. Groen
Committed the ent...
|
597
598
599
600
601
602
603
604
605
|
}
std::vector<std::int32_t> intersect{};
std::set_intersection(m_pendingOperations.begin(), m_pendingOperations.end(), tokens.begin(), tokens.end(), std::back_inserter(intersect));
return intersect.empty();
} );
}
return timeElapsed;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
606
|
bool ClientPaho::isOverlapping( const std::string& topic ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
607
608
|
{
std::string existingTopic{};
|
ca0cf29e
Steven
syntax fixes, con...
|
609
|
return isOverlapping( topic, existingTopic );
|
51becbde
Peter M. Groen
Committed the ent...
|
610
611
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
612
|
bool ClientPaho::isOverlapping( const std::string& topic, std::string& existingTopic ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
613
614
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
615
|
return isOverlappingInternal( topic, existingTopic );
|
51becbde
Peter M. Groen
Committed the ent...
|
616
617
618
619
620
621
622
|
}
std::vector<std::int32_t> ClientPaho::pendingOperations() const
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
std::vector<std::int32_t> retval{};
retval.resize(m_pendingOperations.size());
|
ca0cf29e
Steven
syntax fixes, con...
|
623
|
std::copy( m_pendingOperations.begin(), m_pendingOperations.end(), retval.begin() );
|
51becbde
Peter M. Groen
Committed the ent...
|
624
625
626
627
628
629
630
631
632
|
return retval;
}
bool ClientPaho::hasPendingSubscriptions() const
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
return !m_pendingSubscriptions.empty();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
633
|
boost::optional<bool> ClientPaho::operationResult( std::int32_t token ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
634
635
636
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
boost::optional<bool> ret{};
|
ca0cf29e
Steven
syntax fixes, con...
|
637
638
|
auto cit = m_operationResult.find( token );
if( m_operationResult.end() != cit )
|
51becbde
Peter M. Groen
Committed the ent...
|
639
640
641
642
643
644
|
{
ret = cit->second;
}
return ret;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
645
|
void ClientPaho::parseEndpoint( const std::string& _endpoint )
|
51becbde
Peter M. Groen
Committed the ent...
|
646
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
647
648
|
auto ep = UriParser::parse( _endpoint );
if( ep.find( "user" ) != ep.end() )
|
51becbde
Peter M. Groen
Committed the ent...
|
649
650
651
652
653
|
{
m_username = ep["user"];
ep["user"].clear();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
654
|
if( ep.find( "password" ) != ep.end() )
|
51becbde
Peter M. Groen
Committed the ent...
|
655
656
657
658
|
{
m_password = ep["password"];
ep["password"].clear();
}
|
ca0cf29e
Steven
syntax fixes, con...
|
659
|
m_endpoint = UriParser::toString( ep );
|
51becbde
Peter M. Groen
Committed the ent...
|
660
661
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
662
|
std::int32_t ClientPaho::publishInternal( const MqttMessage& message, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
663
664
665
666
667
668
669
670
671
672
673
674
|
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &ClientPaho::onPublishSuccess;
opts.onFailure = &ClientPaho::onPublishFailure;
opts.context = this;
auto msg = message.toAsyncMessage();
msg.qos = qos;
// Need to lock the mutex because it is possible that the callback is faster than
// the insertion of the token into the pending operations.
// OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ed30280f
Steven
last changes.
|
675
|
auto rc = MQTTAsync_sendMessage( m_client, message.topic().c_str(), &msg, &opts );
|
ca0cf29e
Steven
syntax fixes, con...
|
676
|
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
677
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
678
|
LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " - publish on topic " + message.topic() + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
679
680
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
681
|
if( !m_pendingOperations.insert( opts.token ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
682
|
{
|
d557d523
Peter M. Groen
Fix deferred subs...
|
683
|
// LogDebug( "[ClientPaho::publishInterval]", std::string( m_clientId + " publishInternal - token " + std::to_string( opts.token ) + " already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
684
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
685
|
m_operationResult.erase( opts.token );
|
51becbde
Peter M. Groen
Committed the ent...
|
686
687
688
|
return opts.token;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
689
|
std::int32_t ClientPaho::subscribeInternal( const std::string& topic, int qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
690
691
692
693
694
695
696
697
698
|
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = &ClientPaho::onSubscribeSuccess;
opts.onFailure = &ClientPaho::onSubscribeFailure;
opts.context = this;
// Need to lock the mutex because it is possible that the callback is faster than
// the insertion of the token into the pending operations.
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ed30280f
Steven
last changes.
|
699
|
auto rc = MQTTAsync_subscribe( m_client, topic.c_str(), qos, &opts );
|
51becbde
Peter M. Groen
Committed the ent...
|
700
701
|
if (MQTTASYNC_SUCCESS != rc)
{
|
ca0cf29e
Steven
syntax fixes, con...
|
702
703
|
m_pendingSubscriptions.erase( topic );
LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribtion on topic " + topic + " failed with code " + pahoAsyncErrorCodeToString( rc ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
704
705
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
706
|
if( !m_pendingOperations.insert( opts.token ).second )
|
51becbde
Peter M. Groen
Committed the ent...
|
707
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
708
|
LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " subscribe - token " + std::to_string( opts.token ) + " already in use" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
709
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
710
711
|
m_operationResult.erase( opts.token );
if( m_subscribeTokenToTopic.count( opts.token ) > 0 )
|
51becbde
Peter M. Groen
Committed the ent...
|
712
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
713
|
LogDebug( "[ClientPaho::subscribeInterval]", std::string( m_clientId + " - overwriting pending subscription on topic " + m_subscribeTokenToTopic[opts.token] + " with topic " + topic ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
714
715
716
717
718
|
}
m_subscribeTokenToTopic[opts.token] = topic;
return opts.token;
}
|
0c424e03
Steven
syntax fixes. WIP
|
719
|
void ClientPaho::setConnectionStatus( ConnectionStatus status )
|
51becbde
Peter M. Groen
Committed the ent...
|
720
|
{
|
d557d523
Peter M. Groen
Fix deferred subs...
|
721
|
LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - " ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
722
723
|
ConnectionStatus curStatus = m_connectionStatus;
m_connectionStatus = status;
|
0c424e03
Steven
syntax fixes. WIP
|
724
|
if( status != curStatus && m_connectionStatusCallback )
|
51becbde
Peter M. Groen
Committed the ent...
|
725
|
{
|
d557d523
Peter M. Groen
Fix deferred subs...
|
726
|
LogDebug( "[ClientPaho::setConnectionStatus]", std::string( m_clientId + " - Calling m_connectionStatusCallback" ) );
|
0c424e03
Steven
syntax fixes. WIP
|
727
|
m_connectionStatusCallback( m_clientId, status );
|
51becbde
Peter M. Groen
Committed the ent...
|
728
729
730
|
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
731
|
bool ClientPaho::isOverlappingInternal( const std::string& topic, std::string& existingTopic ) const
|
51becbde
Peter M. Groen
Committed the ent...
|
732
733
|
{
existingTopic.clear();
|
0c424e03
Steven
syntax fixes. WIP
|
734
|
for( const auto& s : m_pendingSubscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
735
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
736
|
if( testForOverlap( s.first, topic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
737
738
739
740
741
742
|
{
existingTopic = s.first;
return true;
}
}
|
0c424e03
Steven
syntax fixes. WIP
|
743
|
for( const auto& s : m_subscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
744
|
{
|
ed30280f
Steven
last changes.
|
745
|
if( testForOverlap( s.first, topic ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
746
747
748
749
750
751
752
753
|
{
existingTopic = s.first;
return true;
}
}
return false;
}
|
ed30280f
Steven
last changes.
|
754
|
void ClientPaho::pushIncomingEvent( std::function<void()> ev )
|
51becbde
Peter M. Groen
Committed the ent...
|
755
|
{
|
ed30280f
Steven
last changes.
|
756
|
m_callbackEventQueue.push( ev );
|
51becbde
Peter M. Groen
Committed the ent...
|
757
758
759
760
|
}
void ClientPaho::callbackEventHandler()
{
|
ca0cf29e
Steven
syntax fixes, con...
|
761
|
LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - starting callback event handler" ) );
|
0c424e03
Steven
syntax fixes. WIP
|
762
763
|
for( ;; )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
764
|
std::vector<std::function<void()>> events;
|
0c424e03
Steven
syntax fixes. WIP
|
765
|
if( !m_callbackEventQueue.pop(events) )
|
51becbde
Peter M. Groen
Committed the ent...
|
766
767
768
769
|
{
break;
}
|
0c424e03
Steven
syntax fixes. WIP
|
770
|
for( const auto& ev : events )
|
51becbde
Peter M. Groen
Committed the ent...
|
771
772
|
{
ev();
|
51becbde
Peter M. Groen
Committed the ent...
|
773
774
|
}
}
|
ed30280f
Steven
last changes.
|
775
|
LogDebug( "[ClientPaho::callbackEventHandler]", std::string( m_clientId + " - leaving callback event handler" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
776
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
777
|
void ClientPaho::onConnectOnInstance( const std::string& cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
778
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
779
|
(void) cause;
|
51becbde
Peter M. Groen
Committed the ent...
|
780
781
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ed30280f
Steven
last changes.
|
782
|
std::copy( m_subscriptions.begin(), m_subscriptions.end(), std::inserter( m_pendingSubscriptions, m_pendingSubscriptions.end() ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
783
784
785
786
|
m_subscriptions.clear();
m_processPendingPublishes = true; // all publishes are on hold until publishPending is called.
}
|
ed30280f
Steven
last changes.
|
787
|
setConnectionStatus( ConnectionStatus::Connected );
|
51becbde
Peter M. Groen
Committed the ent...
|
788
789
|
}
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
790
|
void ClientPaho::onConnectSuccessOnInstance()
|
51becbde
Peter M. Groen
Committed the ent...
|
791
|
{
|
d557d523
Peter M. Groen
Fix deferred subs...
|
792
793
|
LogDebug( "[ClientPaho::onConnectSuccessOnInstance]",
std::string( m_clientId + " - onConnectSuccessOnInstance triggered." ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
794
795
796
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// Register the connect callback that is used in reconnect scenarios.
|
0c424e03
Steven
syntax fixes. WIP
|
797
798
|
auto rc = MQTTAsync_setConnected( m_client, this, &ClientPaho::onConnect );
if( MQTTASYNC_SUCCESS != rc )
|
51becbde
Peter M. Groen
Committed the ent...
|
799
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
800
|
LogError( "[ClientPaho::onConnectSuccessOnInstance]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
801
|
}
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
802
|
|
51becbde
Peter M. Groen
Committed the ent...
|
803
804
805
806
807
808
|
// For MQTTV5
//rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
//if (MQTTASYNC_SUCCESS != rc) {
// // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
//}
// ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
809
|
|
51becbde
Peter M. Groen
Committed the ent...
|
810
811
812
|
m_operationResult[-100] = true;
m_pendingOperations.erase(-100);
}
|
9421324b
Peter M. Groen
First fix on conn...
|
813
|
|
0c424e03
Steven
syntax fixes. WIP
|
814
815
|
setConnectionStatus( ConnectionStatus::Connected );
if( m_connectPromise )
|
51becbde
Peter M. Groen
Committed the ent...
|
816
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
817
|
LogDebug( "[ClientPaho::onConnectSuccessOnInstance]", std::string("connectPromise still present. Resetting!" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
818
819
820
821
822
|
m_connectPromise->set_value();
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
823
|
void ClientPaho::onConnectFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
824
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
825
826
|
(void) response;
LogDebug( "[ClientPaho::onConnectFailureOnInstance]", std::string( "onConnectFailureOnInstance" + m_clientId + " - connection failed with code " + response.codeToString() + " (" + response.message() + ")"));
|
51becbde
Peter M. Groen
Committed the ent...
|
827
828
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
51becbde
Peter M. Groen
Committed the ent...
|
829
830
831
832
|
// ("ClientPaho", "onConnectFailureOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
m_operationResult[-100] = false;
m_pendingOperations.erase(-100);
}
|
ca0cf29e
Steven
syntax fixes, con...
|
833
|
if( ConnectionStatus::ConnectInProgress == m_connectionStatus )
|
51becbde
Peter M. Groen
Committed the ent...
|
834
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
835
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
836
837
838
839
840
841
842
843
844
|
}
m_operationsCompleteCV.notify_all();
}
//void ClientPaho::onDisconnectOnInstance(enum MQTTReasonCodes reasonCode)
//{
// MLOGIC_COMMON_INFO("ClientPaho", "onDisconnectOnInstance %1 - disconnect (reason %2)", MQTTReasonCode_toString(reasonCode));
//}
|
ca0cf29e
Steven
syntax fixes, con...
|
845
|
void ClientPaho::onDisconnectSuccessOnInstance( const MqttSuccess& )
|
51becbde
Peter M. Groen
Committed the ent...
|
846
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
847
|
LogDebug( "[ClientPaho::onDisconnectSuccessOnInstance]", std::string( m_clientId + " - disconnected from endpoint " + m_endpoint ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
848
849
850
851
852
853
854
855
856
857
858
859
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
m_subscriptions.clear();
m_pendingSubscriptions.clear();
m_subscribeTokenToTopic.clear();
m_unsubscribeTokenToTopic.clear();
// ("ClientPaho", "onDisconnectSuccessOnInstance %1 - pending operations : %2, removing all operations", m_clientId, m_pendingOperations);
m_operationResult[-200] = true;
m_pendingOperations.clear();
}
|
0c424e03
Steven
syntax fixes. WIP
|
860
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
861
|
|
0c424e03
Steven
syntax fixes. WIP
|
862
863
|
if( m_disconnectPromise )
{
|
51becbde
Peter M. Groen
Committed the ent...
|
864
865
866
867
868
|
m_disconnectPromise->set_value();
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
869
|
void ClientPaho::onDisconnectFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
870
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
871
|
(void) response;
|
ca0cf29e
Steven
syntax fixes, con...
|
872
|
LogDebug( "[ClientPaho::onDisconnectFailureOnInstance]", std::string( m_clientId + " - disconnect failed with code " + response.codeToString() + " ( " + response.message() + " ) " ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
873
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
874
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
51becbde
Peter M. Groen
Committed the ent...
|
875
876
877
878
879
|
// ("ClientPaho", "onDisconnectFailureOnInstance %1 - pending operations : %2, removing operation -200", m_clientId, m_pendingOperations);
m_operationResult[-200] = false;
m_pendingOperations.erase(-200);
}
|
0c424e03
Steven
syntax fixes. WIP
|
880
|
if( MQTTAsync_isConnected( m_client ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
881
|
{
|
0c424e03
Steven
syntax fixes. WIP
|
882
|
setConnectionStatus( ConnectionStatus::Connected );
|
51becbde
Peter M. Groen
Committed the ent...
|
883
884
885
|
}
else
{
|
0c424e03
Steven
syntax fixes. WIP
|
886
|
setConnectionStatus( ConnectionStatus::Disconnected );
|
51becbde
Peter M. Groen
Committed the ent...
|
887
888
|
}
|
0c424e03
Steven
syntax fixes. WIP
|
889
|
if( m_disconnectPromise )
|
51becbde
Peter M. Groen
Committed the ent...
|
890
891
892
893
894
895
|
{
m_disconnectPromise->set_value();
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
896
|
void ClientPaho::onPublishSuccessOnInstance( const MqttSuccess& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
897
898
|
{
auto pd = response.publishData();
|
d557d523
Peter M. Groen
Fix deferred subs...
|
899
|
// LogDebug( "[ClientPaho::onPublishSuccessOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " succeeded ( message was " + pd.payload() + " )" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
900
901
902
903
904
905
906
907
908
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// ("ClientPaho", "onPublishSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = true;
m_pendingOperations.erase(response.token());
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
909
|
void ClientPaho::onPublishFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
910
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
911
|
LogDebug( "[ClientPaho::onPublishFailureOnInstance]", std::string( m_clientId + " - publish with token " + std::to_string( response.token() ) + " failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
912
913
914
915
916
917
918
919
920
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// ("ClientPaho", "onPublishFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = false;
m_pendingOperations.erase(response.token());
}
m_operationsCompleteCV.notify_all();
}
|
0c424e03
Steven
syntax fixes. WIP
|
921
|
void ClientPaho::onSubscribeSuccessOnInstance( const MqttSuccess& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
922
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
923
924
925
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscribe with token " + std::to_string( response.token() ) + "succeeded" ) );
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); });
|
51becbde
Peter M. Groen
Committed the ent...
|
926
927
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
bool operationOk = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
928
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response, &operationOk]()
|
51becbde
Peter M. Groen
Committed the ent...
|
929
930
931
|
{
// ("ClientPaho", "onSubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = operationOk;
|
ca0cf29e
Steven
syntax fixes, con...
|
932
|
m_pendingOperations.erase( response.token() );
|
51becbde
Peter M. Groen
Committed the ent...
|
933
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
934
935
|
auto it = m_subscribeTokenToTopic.find( response.token() );
if( m_subscribeTokenToTopic.end() == it )
|
0c424e03
Steven
syntax fixes. WIP
|
936
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
937
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
938
939
940
941
942
|
return;
}
auto topic = it->second;
m_subscribeTokenToTopic.erase(it);
|
ca0cf29e
Steven
syntax fixes, con...
|
943
944
|
auto pendingIt = m_pendingSubscriptions.find( topic );
if( m_pendingSubscriptions.end() == pendingIt )
|
51becbde
Peter M. Groen
Committed the ent...
|
945
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
946
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
947
948
|
return;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
949
|
if( response.qos() != pendingIt->second.qos )
|
51becbde
Peter M. Groen
Committed the ent...
|
950
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
951
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - subscription requested qos " + std::to_string( pendingIt->second.qos ) + " , endpoint assigned qos " + std::to_string( response.qos() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
952
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
953
954
955
956
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - move pending subscription on topic " + topic + " to the registered subscriptions" ) );
m_subscriptions.emplace( std::make_pair( pendingIt->first, std::move( pendingIt->second ) ) );
m_pendingSubscriptions.erase( pendingIt );
|
51becbde
Peter M. Groen
Committed the ent...
|
957
958
959
|
operationOk = true;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
960
|
void ClientPaho::onSubscribeFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
961
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
962
963
964
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
965
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
966
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
|
51becbde
Peter M. Groen
Committed the ent...
|
967
968
969
|
{
// MLOGIC_COMMON_DEBUG("ClientPaho", "onSubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
970
|
m_pendingOperations.erase( response.token() );
|
51becbde
Peter M. Groen
Committed the ent...
|
971
972
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
973
974
|
auto it = m_subscribeTokenToTopic.find( response.token() );
if( m_subscribeTokenToTopic.end() == it )
|
51becbde
Peter M. Groen
Committed the ent...
|
975
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
976
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
977
978
979
|
return;
}
auto topic = it->second;
|
ca0cf29e
Steven
syntax fixes, con...
|
980
|
m_subscribeTokenToTopic.erase( it );
|
51becbde
Peter M. Groen
Committed the ent...
|
981
|
|
ca0cf29e
Steven
syntax fixes, con...
|
982
983
|
auto pendingIt = m_pendingSubscriptions.find( topic );
if( m_pendingSubscriptions.end() == pendingIt )
|
51becbde
Peter M. Groen
Committed the ent...
|
984
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
985
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - cannot find pending subscription for token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
986
987
|
return;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
988
989
|
LogDebug( "[ClientPaho::onSubscribeFailureOnInstance]", std::string( m_clientId + " - remove pending subscription on topic " + topic ) );
m_pendingSubscriptions.erase( pendingIt );
|
51becbde
Peter M. Groen
Committed the ent...
|
990
991
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
992
|
void ClientPaho::onUnsubscribeSuccessOnInstance( const MqttSuccess& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
993
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
994
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unsubscribe with token " + std::to_string( response.token() ) + " succeeded " ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
995
|
|
ca0cf29e
Steven
syntax fixes, con...
|
996
|
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
997
998
999
1000
1001
1002
1003
1004
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// On centos7 the unsubscribe response is a nullptr, so we do not have a valid token.
// As a workaround the last unsubscribe token is stored and is used when no valid token is available.
// This is by no means bullet proof because rapid unsubscribes in succession will overwrite this member
// before the callback on the earlier unsubscribe has arrived. On centos7 the unsubscribes have to be handled
// sequentially (see ClientPaho::unsubscribe)!
auto token = response.token();
|
ca0cf29e
Steven
syntax fixes, con...
|
1005
|
if( -1 == token )
|
51becbde
Peter M. Groen
Committed the ent...
|
1006
1007
1008
1009
1010
1011
|
{
token = m_lastUnsubscribe;
m_lastUnsubscribe = -1;
}
bool operationOk = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
1012
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, token, &operationOk]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1013
1014
1015
|
{
// ("ClientPaho", "onUnsubscribeSuccessOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, token);
m_operationResult[token] = operationOk;
|
ca0cf29e
Steven
syntax fixes, con...
|
1016
|
m_pendingOperations.erase( token );
|
51becbde
Peter M. Groen
Committed the ent...
|
1017
1018
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1019
1020
|
auto it = m_unsubscribeTokenToTopic.find( token );
if( m_unsubscribeTokenToTopic.end() == it )
|
51becbde
Peter M. Groen
Committed the ent...
|
1021
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1022
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( token ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1023
1024
1025
|
return;
}
auto topic = it->second;
|
ca0cf29e
Steven
syntax fixes, con...
|
1026
|
m_unsubscribeTokenToTopic.erase( it );
|
51becbde
Peter M. Groen
Committed the ent...
|
1027
|
|
ca0cf29e
Steven
syntax fixes, con...
|
1028
1029
1030
1031
|
auto registeredIt = m_subscriptions.find( topic );
if( m_subscriptions.end() == registeredIt )
{
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - cannot find subscription for token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1032
1033
|
return;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1034
1035
1036
|
LogDebug( "[ClientPaho::onSubscribeSuccessOnInstance]", std::string( m_clientId + " - remove subscription on topic " + topic + " from the registered subscriptions" ) );
m_subscriptions.erase( registeredIt );
|
51becbde
Peter M. Groen
Committed the ent...
|
1037
1038
1039
|
operationOk = true;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1040
|
void ClientPaho::onUnsubscribeFailureOnInstance( const MqttFailure& response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1041
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1042
1043
|
LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - subscription failed with code " + response.codeToString() + " ( " + response.message() + " )" ) );
OSDEV_COMPONENTS_SCOPEGUARD( m_operationsCompleteCV, [this]() { m_operationsCompleteCV.notify_all(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1044
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
1045
|
OSDEV_COMPONENTS_SCOPEGUARD( m_pendingOperations, [this, &response]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1046
1047
1048
|
{
// ("ClientPaho", "onUnsubscribeFailureOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, response.token());
m_operationResult[response.token()] = false;
|
ca0cf29e
Steven
syntax fixes, con...
|
1049
|
m_pendingOperations.erase( response.token() );
|
51becbde
Peter M. Groen
Committed the ent...
|
1050
1051
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1052
1053
|
auto it = m_unsubscribeTokenToTopic.find( response.token() );
if( m_unsubscribeTokenToTopic.end() == it )
|
51becbde
Peter M. Groen
Committed the ent...
|
1054
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1055
|
LogDebug( "[ClientPaho::onUnsubscribeFailureOnInstance]", std::string( m_clientId + " - unknown token " + std::to_string( response.token() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1056
1057
1058
|
return;
}
auto topic = it->second;
|
ca0cf29e
Steven
syntax fixes, con...
|
1059
|
m_unsubscribeTokenToTopic.erase( it );
|
51becbde
Peter M. Groen
Committed the ent...
|
1060
1061
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1062
|
int ClientPaho::onMessageArrivedOnInstance( const MqttMessage& message )
|
51becbde
Peter M. Groen
Committed the ent...
|
1063
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1064
|
LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - received message on topic " + message.topic() + ", retained : " + std::to_string( message.retained() ) + ", dup : " + std::to_string( message.duplicate() ) ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1065
|
|
ca0cf29e
Steven
syntax fixes, con...
|
1066
|
std::function<void( MqttMessage )> cb;
|
51becbde
Peter M. Groen
Committed the ent...
|
1067
1068
|
{
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
|
ca0cf29e
Steven
syntax fixes, con...
|
1069
|
for( const auto& s : m_subscriptions )
|
51becbde
Peter M. Groen
Committed the ent...
|
1070
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1071
|
if( boost::regex_match( message.topic(), s.second.topicRegex ) )
|
51becbde
Peter M. Groen
Committed the ent...
|
1072
1073
1074
1075
1076
1077
|
{
cb = s.second.callback;
}
}
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1078
|
if( cb )
|
51becbde
Peter M. Groen
Committed the ent...
|
1079
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1080
|
cb( message );
|
51becbde
Peter M. Groen
Committed the ent...
|
1081
1082
1083
|
}
else
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1084
|
LogDebug( "[ClientPaho::onMessageArrivedOnInstance]", std::string( m_clientId + " - no tpic filter found for message received on topic " + message.topic() ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1085
1086
1087
1088
|
}
return 1;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1089
|
void ClientPaho::onDeliveryCompleteOnInstance( MQTTAsync_token token )
|
51becbde
Peter M. Groen
Committed the ent...
|
1090
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1091
1092
|
LogDebug( "[ClientPaho::onDeliveryCompleteOnInstance]", std::string( m_clientId + " - message with token " + std::to_string( token ) + " is delivered" ) );
if( m_deliveryCompleteCallback )
|
51becbde
Peter M. Groen
Committed the ent...
|
1093
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1094
|
m_deliveryCompleteCallback( m_clientId, static_cast<std::int32_t>( token ) );
|
51becbde
Peter M. Groen
Committed the ent...
|
1095
1096
1097
|
}
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1098
|
void ClientPaho::onConnectionLostOnInstance( const std::string& cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1099
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1100
|
(void) cause;
|
51becbde
Peter M. Groen
Committed the ent...
|
1101
|
// ("ClientPaho", "onConnectionLostOnInstance %1 - connection lost (%2)", m_clientId, cause);
|
ca0cf29e
Steven
syntax fixes, con...
|
1102
|
setConnectionStatus( ConnectionStatus::ReconnectInProgress );
|
51becbde
Peter M. Groen
Committed the ent...
|
1103
1104
1105
|
OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
// Remove all tokens related to subscriptions from the active operations.
|
ca0cf29e
Steven
syntax fixes, con...
|
1106
|
for( const auto& p : m_subscribeTokenToTopic )
|
51becbde
Peter M. Groen
Committed the ent...
|
1107
1108
|
{
// ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
|
ca0cf29e
Steven
syntax fixes, con...
|
1109
|
m_pendingOperations.erase( p.first );
|
51becbde
Peter M. Groen
Committed the ent...
|
1110
1111
|
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1112
|
for( const auto& p : m_unsubscribeTokenToTopic )
|
51becbde
Peter M. Groen
Committed the ent...
|
1113
1114
|
{
// ("ClientPaho", "onConnectionLostOnInstance %1 - pending operations : %2, removing operation %3", m_clientId, m_pendingOperations, p.first);
|
ca0cf29e
Steven
syntax fixes, con...
|
1115
|
m_pendingOperations.erase( p.first );
|
51becbde
Peter M. Groen
Committed the ent...
|
1116
1117
1118
1119
1120
1121
1122
|
}
// Clear the administration used in the subscribe process.
m_subscribeTokenToTopic.clear();
m_unsubscribeTokenToTopic.clear();
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1123
|
void ClientPaho::onFirstConnect( void* context, char* cause )
|
9421324b
Peter M. Groen
First fix on conn...
|
1124
1125
|
{
LogInfo( "[ClientPaho::onFirstConnect]", "onFirstConnect triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1126
|
if( context )
|
9421324b
Peter M. Groen
First fix on conn...
|
1127
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1128
1129
1130
|
auto *cl = reinterpret_cast<ClientPaho*>( context );
std::string reason( nullptr == cause ? "Unknown cause" : cause );
cl->pushIncomingEvent( [cl, reason]() { cl->onConnectSuccessOnInstance(); } );
|
9421324b
Peter M. Groen
First fix on conn...
|
1131
1132
1133
|
}
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1134
|
void ClientPaho::onConnect( void* context, char* cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1135
|
{
|
9421324b
Peter M. Groen
First fix on conn...
|
1136
|
LogInfo( "[ClientPaho::onConnect]", "onConnect triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1137
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1138
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1139
1140
1141
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
std::string reason( nullptr == cause ? "unknown cause" : cause );
cl->pushIncomingEvent( [cl, reason]() { cl->onConnectOnInstance( reason ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1142
1143
1144
1145
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1146
|
void ClientPaho::onConnectSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1147
|
{
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
1148
|
LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1149
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1150
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1151
1152
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
if( !response )
|
76d01373
Peter M. Groen
Fix on connection
|
1153
|
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1154
|
// connect should always have a valid response struct.
|
ca0cf29e
Steven
syntax fixes, con...
|
1155
|
LogError( "[ClientPaho::onConnectSuccess]", "onConnectSuccess - no response data" );
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
1156
|
return;
|
51becbde
Peter M. Groen
Committed the ent...
|
1157
|
}
|
2c0c99a5
Peter M. Groen
Fix connect Callb...
|
1158
|
// MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
|
ca0cf29e
Steven
syntax fixes, con...
|
1159
|
cl->pushIncomingEvent( [cl]() { cl->onConnectSuccessOnInstance(); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1160
1161
1162
1163
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1164
|
void ClientPaho::onConnectFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1165
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1166
1167
|
LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ) );
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1168
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1169
1170
1171
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onConnectFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
|
}
}
//// static
//void ClientPaho::onDisconnect(void* context, MQTTProperties* properties, enum MQTTReasonCodes reasonCode)
//{
// apply_unused_parameters(properties);
// try {
// if (context) {
// auto* cl = reinterpret_cast<ClientPaho*>(context);
// cl->pushIncomingEvent([cl, reasonCode]() { cl->onDisconnectOnInstance(reasonCode); });
// }
// }
// catch (...) {
// }
// catch (const std::exception& e) {
// MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - exception : %1", e.what());
// }
// catch (...) {
// MLOGIC_COMMON_ERROR("ClientPaho", "onDisconnect - unknown exception");
// }
//}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1196
|
void ClientPaho::onDisconnectSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1197
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1198
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1199
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1200
1201
1202
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttSuccess resp( response ? response->token : 0 );
cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectSuccessOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1203
1204
1205
1206
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1207
|
void ClientPaho::onDisconnectFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1208
|
{
|
9421324b
Peter M. Groen
First fix on conn...
|
1209
|
LogInfo( "[ClientPaho::onDisconnectFailure]", "onDisconnectFailure triggered.." );
|
ca0cf29e
Steven
syntax fixes, con...
|
1210
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1211
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1212
1213
1214
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onDisconnectFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1215
1216
1217
1218
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1219
|
void ClientPaho::onPublishSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1220
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1221
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1222
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1223
1224
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
if( !response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
|
{
// publish should always have a valid response struct.
// toLogFile ("ClientPaho", "onPublishSuccess - no response data");
}
MqttSuccess resp(response->token, MqttMessage(response->alt.pub.destinationName == nullptr ? "null" : response->alt.pub.destinationName, response->alt.pub.message));
cl->pushIncomingEvent([cl, resp]() { cl->onPublishSuccessOnInstance(resp); });
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1235
|
void ClientPaho::onPublishFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1236
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1237
1238
|
(void) response;
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1239
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1240
1241
1242
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onPublishFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1243
1244
1245
1246
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1247
|
void ClientPaho::onSubscribeSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1248
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1249
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1250
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1251
1252
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
if( !response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1253
1254
1255
1256
|
{
// subscribe should always have a valid response struct.
// MLOGIC_COMMON_FATAL("ClientPaho", "onSubscribeSuccess - no response data");
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1257
1258
|
MqttSuccess resp( response->token, response->alt.qos );
cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeSuccessOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1259
1260
1261
1262
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1263
|
void ClientPaho::onSubscribeFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1264
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1265
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1266
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1267
1268
1269
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onSubscribeFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1270
1271
1272
1273
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1274
|
void ClientPaho::onUnsubscribeSuccess( void* context, MQTTAsync_successData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1275
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1276
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1277
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1278
1279
1280
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttSuccess resp( response ? response->token : -1 );
cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeSuccessOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1281
1282
1283
1284
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1285
|
void ClientPaho::onUnsubscribeFailure( void* context, MQTTAsync_failureData* response )
|
51becbde
Peter M. Groen
Committed the ent...
|
1286
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1287
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1288
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1289
1290
1291
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttFailure resp( response );
cl->pushIncomingEvent( [cl, resp]() { cl->onUnsubscribeFailureOnInstance( resp ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1292
1293
1294
1295
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1296
|
int ClientPaho::onMessageArrived( void* context, char* topicName, int, MQTTAsync_message* message )
|
51becbde
Peter M. Groen
Committed the ent...
|
1297
1298
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1299
|
OSDEV_COMPONENTS_SCOPEGUARD( freeMessage, [&topicName, &message]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1300
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1301
1302
|
MQTTAsync_freeMessage( &message );
MQTTAsync_free( topicName );
|
51becbde
Peter M. Groen
Committed the ent...
|
1303
1304
|
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1305
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1306
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1307
1308
1309
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
MqttMessage msg( topicName, *message );
cl->pushIncomingEvent( [cl, msg]() { cl->onMessageArrivedOnInstance( msg ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1310
1311
1312
1313
1314
1315
|
}
return 1; // always return true. Otherwise this callback is triggered again.
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1316
|
void ClientPaho::onDeliveryComplete( void* context, MQTTAsync_token token )
|
51becbde
Peter M. Groen
Committed the ent...
|
1317
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1318
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1319
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1320
1321
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
cl->pushIncomingEvent( [cl, token]() { cl->onDeliveryCompleteOnInstance( token ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1322
1323
1324
1325
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1326
|
void ClientPaho::onConnectionLost( void* context, char* cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1327
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1328
|
OSDEV_COMPONENTS_SCOPEGUARD( freeCause, [&cause]()
|
51becbde
Peter M. Groen
Committed the ent...
|
1329
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1330
|
if( cause )
|
51becbde
Peter M. Groen
Committed the ent...
|
1331
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1332
|
MQTTAsync_free( cause );
|
51becbde
Peter M. Groen
Committed the ent...
|
1333
1334
1335
|
}
});
|
ca0cf29e
Steven
syntax fixes, con...
|
1336
|
if( context )
|
51becbde
Peter M. Groen
Committed the ent...
|
1337
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1338
1339
1340
|
auto* cl = reinterpret_cast<ClientPaho*>( context );
std::string msg( nullptr == cause ? "cause unknown" : cause );
cl->pushIncomingEvent( [cl, msg]() { cl->onConnectionLostOnInstance( msg ); } );
|
51becbde
Peter M. Groen
Committed the ent...
|
1341
1342
1343
1344
|
}
}
// static
|
ca0cf29e
Steven
syntax fixes, con...
|
1345
|
void ClientPaho::onLogPaho( enum MQTTASYNC_TRACE_LEVELS level, char* message )
|
51becbde
Peter M. Groen
Committed the ent...
|
1346
|
{
|
ca0cf29e
Steven
syntax fixes, con...
|
1347
1348
|
(void) message;
switch( level )
|
51becbde
Peter M. Groen
Committed the ent...
|
1349
1350
1351
|
{
case MQTTASYNC_TRACE_MAXIMUM:
case MQTTASYNC_TRACE_MEDIUM:
|
ca0cf29e
Steven
syntax fixes, con...
|
1352
1353
|
case MQTTASYNC_TRACE_MINIMUM:
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1354
1355
1356
|
// ("ClientPaho", "paho - %1", message)
break;
}
|
ca0cf29e
Steven
syntax fixes, con...
|
1357
1358
|
case MQTTASYNC_TRACE_PROTOCOL:
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1359
1360
1361
1362
1363
|
// ("ClientPaho", "paho - %1", message)
break;
}
case MQTTASYNC_TRACE_ERROR:
case MQTTASYNC_TRACE_SEVERE:
|
ca0cf29e
Steven
syntax fixes, con...
|
1364
1365
|
case MQTTASYNC_TRACE_FATAL:
{
|
51becbde
Peter M. Groen
Committed the ent...
|
1366
1367
1368
1369
1370
|
// ("ClientPaho", "paho - %1", message)
break;
}
}
}
|