Merged
Merge Request #8
·
created by
added LWT ( last will & testament ) option to mqtt connect.
added support for a last will & testament message. LWT settings are not required for connection, but can be provided as optional parameter when calling the mqttclient connect function. ( see publisher test case ). The parameters to be added are the topic to publish on and the message to be published in case of a connection disruption.
-
agreed, changed it to lwt_topic and lwt_message.
-
agreed, changed it to lwt_topic and lwt_message.
-
yep, minor typo. has been fixed.
-
yep, minor typo. has been fixed.
-
yep, minor typo. has been fixed.
-
Please fix the comments on the code and add the following :
- If a connection was made and the connection is lost ( e.g. the network connection is disabled ) the LWT needs to get published.
- If the connection is restored, make sure data is being send again.
- Again, disable the network connection and check to see if the LWT is again being published. If not, make sure it gets re-issued if a connection is restored. The LWT needs to be stored in a member-variable of clientpaho...
-
fixed the code relevant to the comments. looking into the connection issues now.
-
LWT is correctly published on connection interruption. date is picked up again once connection is restored and LWT is also published again once interrupted again.
-
Tested and looking good. Merging to Development and Master
-
mentioned in commit b4d6ed568a3652ab9a0078f2eb6e976d1497d898
-
Status changed to merged
29 | 29 | |
30 | 30 | } |
31 | 31 | |
32 | -void Publisher::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password ) | |
32 | +void Publisher::connect(const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message) | |
1 |
|
36 | 36 | |
37 | 37 | virtual ~Publisher() {} |
38 | 38 | |
39 | - void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string() ); | |
39 | + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string() | |
40 | + , const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() ); | |
1 |
|
59 | 60 | * @param port The port to use. |
60 | 61 | * @param credentials The credentials to use. |
61 | 62 | */ |
62 | - virtual void connect(const std::string& host, int port, const Credentials& credentials) = 0; | |
63 | + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
1 |
|
59 | 60 | * @param port The port to use. |
60 | 61 | * @param credentials The credentials to use. |
61 | 62 | */ |
62 | - virtual void connect(const std::string& host, int port, const Credentials& credentials) = 0; | |
63 | + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
63 | 64 | |
64 | 65 | /** |
65 | 66 | * @brief Connect to the endpoint |
66 | 67 | * @param endpoint an uri endpoint description. |
67 | 68 | */ |
68 | - virtual void connect(const std::string& endpoint) = 0; | |
69 | + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
1 |
|
91 | 92 | /** |
92 | 93 | * @see IMqttClient |
93 | 94 | */ |
94 | - virtual void connect(const std::string& host, int port, const Credentials& credentials) override; | |
95 | + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override; | |
1 |
|
... | ... | @@ -78,7 +78,7 @@ int main( int argc, char* argv[] ) |
78 | 78 | { |
79 | 79 | std::cout << "{OK}" << std::endl; |
80 | 80 | std::cout << "Connecting to the broker : "; |
81 | - pPublisher->connect( "localhost", 1883, "", "" ); | |
81 | + pPublisher->connect( "localhost", 1883, "", "", "LWT-test", "connection disrupted.." ); | |
82 | 82 | |
83 | 83 | // Assume we are connected now, start publishing. |
84 | 84 | while( 1 ) |
... | ... | @@ -94,7 +94,7 @@ int main( int argc, char* argv[] ) |
94 | 94 | } |
95 | 95 | messageNumber++; |
96 | 96 | } |
97 | - sleepcp( 1, T_MICRO ); | |
97 | + sleepcp( 5, T_SECONDS ); | |
98 | 98 | } |
99 | 99 | } |
100 | 100 | else | ... | ... |
... | ... | @@ -29,9 +29,9 @@ Publisher::Publisher() |
29 | 29 | |
30 | 30 | } |
31 | 31 | |
32 | -void Publisher::connect( const std::string &hostname, int portnumber, const std::string &username, const std::string &password ) | |
32 | +void Publisher::connect(const std::string &hostname, int portnumber, const std::string &username, const std::string &password, const std::string &lwt_topic, const std::string &lwt_message) | |
1 |
|
|
33 | 33 | { |
34 | - m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ) ); | |
34 | + m_mqtt_client.connect( hostname, portnumber, osdev::components::mqtt::Credentials( username, password ), osdev::components::mqtt::mqtt_LWT( lwt_topic, lwt_message ) ); | |
35 | 35 | std::cout << "Client state : " << m_mqtt_client.state() << std::endl; |
36 | 36 | } |
37 | 37 | ... | ... |
... | ... | @@ -36,7 +36,8 @@ public: |
36 | 36 | |
37 | 37 | virtual ~Publisher() {} |
38 | 38 | |
39 | - void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string() ); | |
39 | + void connect( const std::string &hostname, int portnumber = 1883, const std::string &username = std::string(), const std::string &password = std::string() | |
40 | + , const std::string &lwt_topic = std::string(), const std::string &lwt_message = std::string() ); | |
1 |
|
|
40 | 41 | |
41 | 42 | void publish( const std::string &message_topic, const std::string &message_payload ); |
42 | 43 | ... | ... |
... | ... | @@ -45,6 +45,7 @@ |
45 | 45 | #include "imqttclientimpl.h" |
46 | 46 | #include "mqttfailure.h" |
47 | 47 | #include "mqttsuccess.h" |
48 | +#include "mqtt_lwt.h" | |
48 | 49 | |
49 | 50 | namespace osdev { |
50 | 51 | namespace components { |
... | ... | @@ -94,7 +95,7 @@ public: |
94 | 95 | /** |
95 | 96 | * @see IMqttClientImpl |
96 | 97 | */ |
97 | - virtual std::int32_t connect(bool wait) override; | |
98 | + virtual std::int32_t connect(bool wait, const mqtt_LWT &lwt = mqtt_LWT() ) override; | |
98 | 99 | |
99 | 100 | /** |
100 | 101 | * @see IMqttClientImpl | ... | ... |
... | ... | @@ -39,6 +39,7 @@ |
39 | 39 | #include "credentials.h" |
40 | 40 | #include "mqttmessage.h" |
41 | 41 | #include "token.h" |
42 | +#include "mqtt_lwt.h" | |
42 | 43 | |
43 | 44 | namespace osdev { |
44 | 45 | namespace components { |
... | ... | @@ -59,13 +60,13 @@ public: |
59 | 60 | * @param port The port to use. |
60 | 61 | * @param credentials The credentials to use. |
61 | 62 | */ |
62 | - virtual void connect(const std::string& host, int port, const Credentials& credentials) = 0; | |
63 | + virtual void connect(const std::string& host, int port, const Credentials& credentials, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
1 |
|
|
63 | 64 | |
64 | 65 | /** |
65 | 66 | * @brief Connect to the endpoint |
66 | 67 | * @param endpoint an uri endpoint description. |
67 | 68 | */ |
68 | - virtual void connect(const std::string& endpoint) = 0; | |
69 | + virtual void connect(const std::string& endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
1 |
|
|
69 | 70 | |
70 | 71 | /** |
71 | 72 | * @brief Disconnect the client from the broker | ... | ... |
... | ... | @@ -36,6 +36,7 @@ |
36 | 36 | // mlogic::mqtt |
37 | 37 | #include "connectionstatus.h" |
38 | 38 | #include "mqttmessage.h" |
39 | +#include "mqtt_lwt.h" | |
39 | 40 | |
40 | 41 | namespace osdev { |
41 | 42 | namespace components { |
... | ... | @@ -65,7 +66,7 @@ public: |
65 | 66 | * @param wait A flag that indicates if the method should wait for a succesful connection. |
66 | 67 | * @return the operation token. |
67 | 68 | */ |
68 | - virtual std::int32_t connect(bool wait) = 0; | |
69 | + virtual std::int32_t connect( bool wait, const mqtt_LWT &lwt = mqtt_LWT() ) = 0; | |
69 | 70 | |
70 | 71 | /** |
71 | 72 | * @brief Disconnect the wrapper. | ... | ... |
1 | +/* **************************************************************************** | |
2 | + * Copyright 2019 Open Systems Development BV * | |
3 | + * * | |
4 | + * Permission is hereby granted, free of charge, to any person obtaining a * | |
5 | + * copy of this software and associated documentation files (the "Software"), * | |
6 | + * to deal in the Software without restriction, including without limitation * | |
7 | + * the rights to use, copy, modify, merge, publish, distribute, sublicense, * | |
8 | + * and/or sell copies of the Software, and to permit persons to whom the * | |
9 | + * Software is furnished to do so, subject to the following conditions: * | |
10 | + * * | |
11 | + * The above copyright notice and this permission notice shall be included in * | |
12 | + * all copies or substantial portions of the Software. * | |
13 | + * * | |
14 | + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * | |
15 | + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * | |
16 | + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * | |
17 | + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * | |
18 | + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * | |
19 | + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * | |
20 | + * DEALINGS IN THE SOFTWARE. * | |
21 | + * ***************************************************************************/ | |
22 | +#pragma once | |
23 | + | |
24 | +// std | |
25 | +#include <string> | |
26 | + | |
27 | +namespace osdev { | |
28 | +namespace components { | |
29 | +namespace mqtt { | |
30 | + | |
31 | +/*! | |
32 | + * \brief Class that holds LWT ( Last Will & Testament ) context. | |
33 | + */ | |
34 | +class mqtt_LWT | |
35 | +{ | |
36 | +public: | |
37 | + /*! | |
38 | + * \brief Default CTor, empty LWT | |
39 | + */ | |
40 | + mqtt_LWT() | |
41 | + : m_topic() | |
42 | + , m_message() | |
43 | + {} | |
44 | + | |
45 | + /*! | |
46 | + * \brief Constructor for LWT topic/message | |
47 | + * \param topic - The topic on which the LWT is published. | |
48 | + * \param message - The message published on broker disconnection. | |
49 | + */ | |
50 | + mqtt_LWT( const std::string &topic, const std::string &message ) | |
51 | + : m_topic( topic ) | |
52 | + , m_message( message ) | |
53 | + {} | |
54 | + | |
55 | + const std::string& topic() const { return m_topic; } | |
56 | + const std::string& message() const { return m_message; } | |
57 | + | |
58 | +private: | |
59 | + std::string m_topic; | |
60 | + std::string m_message; | |
61 | +}; | |
62 | + | |
63 | +} // End namespace mqtt | |
64 | +} // End namespace components | |
65 | +} // End namespace osdev | ... | ... |
... | ... | @@ -36,6 +36,7 @@ |
36 | 36 | #include "serverstate.h" |
37 | 37 | |
38 | 38 | #include "imqttclient.h" |
39 | +#include "mqtt_lwt.h" | |
39 | 40 | |
40 | 41 | namespace osdev { |
41 | 42 | namespace components { |
... | ... | @@ -91,12 +92,12 @@ public: |
91 | 92 | /** |
92 | 93 | * @see IMqttClient |
93 | 94 | */ |
94 | - virtual void connect(const std::string& host, int port, const Credentials& credentials) override; | |
95 | + virtual void connect( const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt = mqtt_LWT() ) override; | |
1 |
|
|
95 | 96 | |
96 | 97 | /** |
97 | 98 | * @see IMqttClient |
98 | 99 | */ |
99 | - virtual void connect(const std::string& endpoint) override; | |
100 | + virtual void connect( const std::string &endpoint, const mqtt_LWT &lwt = mqtt_LWT() ) override; | |
100 | 101 | |
101 | 102 | /** |
102 | 103 | * @see IMqttClient | ... | ... |
... | ... | @@ -182,7 +182,7 @@ ConnectionStatus ClientPaho::connectionStatus() const |
182 | 182 | return m_connectionStatus; |
183 | 183 | } |
184 | 184 | |
185 | -std::int32_t ClientPaho::connect(bool wait) | |
185 | +std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) | |
186 | 186 | { |
187 | 187 | { |
188 | 188 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); |
... | ... | @@ -200,6 +200,21 @@ std::int32_t ClientPaho::connect(bool wait) |
200 | 200 | conn_opts.onFailure = &ClientPaho::onConnectFailure; |
201 | 201 | conn_opts.context = this; |
202 | 202 | conn_opts.automaticReconnect = 1; |
203 | + | |
204 | + if( !lwt.topic().empty() ) | |
205 | + { | |
206 | + MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; | |
207 | + will_opts.message = lwt.message().c_str(); | |
208 | + will_opts.topicName = lwt.topic().c_str(); | |
209 | + | |
210 | + conn_opts.will = &will_opts; | |
211 | + } | |
212 | + else | |
213 | + { | |
214 | + conn_opts.will = nullptr; | |
215 | + } | |
216 | + | |
217 | + | |
203 | 218 | if (!m_username.empty()) |
204 | 219 | { |
205 | 220 | conn_opts.username = m_username.c_str(); |
... | ... | @@ -451,7 +466,7 @@ void ClientPaho::resubscribe() |
451 | 466 | } |
452 | 467 | } |
453 | 468 | |
454 | -std::int32_t ClientPaho::unsubscribe(const std::string& topic, int qos) | |
469 | +std::int32_t ClientPaho::unsubscribe( const std::string& topic, int qos ) | |
455 | 470 | { |
456 | 471 | { |
457 | 472 | OSDEV_COMPONENTS_LOCKGUARD(m_mutex); | ... | ... |
... | ... | @@ -125,9 +125,8 @@ StateEnum MqttClient::state() const |
125 | 125 | return m_serverState.state(); |
126 | 126 | } |
127 | 127 | |
128 | -void MqttClient::connect(const std::string& host, int port, const Credentials& credentials) | |
128 | +void MqttClient::connect(const std::string& host, int port, const Credentials &credentials, const mqtt_LWT &lwt ) | |
129 | 129 | { |
130 | - | |
131 | 130 | osdev::components::mqtt::ParsedUri _endpoint = { |
132 | 131 | { "scheme", "tcp" }, |
133 | 132 | { "user", credentials.username() }, |
... | ... | @@ -135,10 +134,11 @@ void MqttClient::connect(const std::string& host, int port, const Credentials& c |
135 | 134 | { "host", host }, |
136 | 135 | { "port", std::to_string(port) } |
137 | 136 | }; |
138 | - this->connect(UriParser::toString(_endpoint)); | |
137 | + | |
138 | + this->connect( UriParser::toString( _endpoint ), lwt ); | |
139 | 139 | } |
140 | 140 | |
141 | -void MqttClient::connect(const std::string& _endpoint) | |
141 | +void MqttClient::connect( const std::string &_endpoint, const mqtt_LWT &lwt ) | |
142 | 142 | { |
143 | 143 | LogInfo( "MqttClient", std::string( m_clientId + " - Request connect" ) ); |
144 | 144 | |
... | ... | @@ -159,7 +159,8 @@ void MqttClient::connect(const std::string& _endpoint) |
159 | 159 | } |
160 | 160 | } |
161 | 161 | m_endpoint = _endpoint; |
162 | - if (!m_principalClient) { | |
162 | + if (!m_principalClient) | |
163 | + { | |
163 | 164 | std::string derivedClientId(generateUniqueClientId(m_clientId, 1)); |
164 | 165 | m_principalClient = std::make_unique<ClientPaho>( |
165 | 166 | m_endpoint, |
... | ... | @@ -169,7 +170,8 @@ void MqttClient::connect(const std::string& _endpoint) |
169 | 170 | } |
170 | 171 | client = m_principalClient.get(); |
171 | 172 | } |
172 | - client->connect(true); | |
173 | + | |
174 | + client->connect( true, lwt ); | |
173 | 175 | } |
174 | 176 | |
175 | 177 | void MqttClient::disconnect() | ... | ... |