Commit 2c0c99a53b977cc292da46250e3492a9a0b59526

Authored by Peter M. Groen
1 parent 76d01373

Fix connect Callbacks

include/clientpaho.h
@@ -204,12 +204,6 @@ private: @@ -204,12 +204,6 @@ private:
204 void callbackEventHandler(); 204 void callbackEventHandler();
205 205
206 /** 206 /**
207 - * @brief Callback method that is called when a first connect succeeds.  
208 - * @param reason Som extra information if there is any.  
209 - */  
210 - void onFirstConnectOnInstance(const std::string &reason);  
211 -  
212 - /**  
213 * @brief Callback method that is called when a reconnect succeeds. 207 * @brief Callback method that is called when a reconnect succeeds.
214 * @param cause The cause of the original disconnect. 208 * @param cause The cause of the original disconnect.
215 */ 209 */
@@ -221,7 +215,7 @@ private: @@ -221,7 +215,7 @@ private:
221 * The connection status is set to Connected. 215 * The connection status is set to Connected.
222 * @param response A success response with connection data. 216 * @param response A success response with connection data.
223 */ 217 */
224 - void onConnectSuccessOnInstance(const MqttSuccess& response); 218 + void onConnectSuccessOnInstance();
225 219
226 /** 220 /**
227 * @brief Callback that is called when a connect call fails after being sent to the endpoint. 221 * @brief Callback that is called when a connect call fails after being sent to the endpoint.
src/clientpaho.cpp
@@ -203,7 +203,7 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt ) @@ -203,7 +203,7 @@ std::int32_t ClientPaho::connect( bool wait, const mqtt_LWT &lwt )
203 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; 203 MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
204 conn_opts.keepAliveInterval = 5; 204 conn_opts.keepAliveInterval = 5;
205 conn_opts.cleansession = 1; 205 conn_opts.cleansession = 1;
206 - conn_opts.onSuccess = &ClientPaho::onConnectSuccess; 206 + conn_opts.onSuccess = nullptr;
207 conn_opts.onFailure = &ClientPaho::onConnectFailure; 207 conn_opts.onFailure = &ClientPaho::onConnectFailure;
208 conn_opts.context = this; 208 conn_opts.context = this;
209 conn_opts.automaticReconnect = 1; 209 conn_opts.automaticReconnect = 1;
@@ -764,23 +764,6 @@ void ClientPaho::callbackEventHandler() @@ -764,23 +764,6 @@ void ClientPaho::callbackEventHandler()
764 } 764 }
765 // ("ClientPaho", "%1 - leaving callback event handler", m_clientId); 765 // ("ClientPaho", "%1 - leaving callback event handler", m_clientId);
766 } 766 }
767 -  
768 -void ClientPaho::onFirstConnectOnInstance(const std::string &reason)  
769 -{  
770 - (void)reason;  
771 - {  
772 - OSDEV_COMPONENTS_LOCKGUARD(m_mutex);  
773 - // Remove the connect callback that is used in reconnect scenarios.  
774 - auto rc = MQTTAsync_setConnected(m_client, this, nullptr );  
775 - if (MQTTASYNC_SUCCESS != rc)  
776 - {  
777 - LogError( "[ClientPaho]", std::string( "onFirstConnectOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );  
778 - }  
779 - }  
780 -  
781 - setConnectionStatus(ConnectionStatus::Connected);  
782 -}  
783 -  
784 void ClientPaho::onConnectOnInstance(const std::string& cause) 767 void ClientPaho::onConnectOnInstance(const std::string& cause)
785 { 768 {
786 (void)cause; 769 (void)cause;
@@ -795,11 +778,8 @@ void ClientPaho::onConnectOnInstance(const std::string& cause) @@ -795,11 +778,8 @@ void ClientPaho::onConnectOnInstance(const std::string& cause)
795 setConnectionStatus(ConnectionStatus::Connected); 778 setConnectionStatus(ConnectionStatus::Connected);
796 } 779 }
797 780
798 -void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) 781 +void ClientPaho::onConnectSuccessOnInstance()
799 { 782 {
800 - auto connectData = response.connectionData();  
801 - LogDebug( "[ClientPaho]", std::string( "onConnectSuccessOnInstance " + m_clientId + " - connected to endpoint " + connectData.serverUri() +  
802 - " (mqtt version " + std::to_string( connectData.mqttVersion() ) + ", session present " + ( connectData.sessionPresent() ? "TRUE" : "FALSE" ) + " )" ) );  
803 { 783 {
804 OSDEV_COMPONENTS_LOCKGUARD(m_mutex); 784 OSDEV_COMPONENTS_LOCKGUARD(m_mutex);
805 // Register the connect callback that is used in reconnect scenarios. 785 // Register the connect callback that is used in reconnect scenarios.
@@ -808,12 +788,14 @@ void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response) @@ -808,12 +788,14 @@ void ClientPaho::onConnectSuccessOnInstance(const MqttSuccess& response)
808 { 788 {
809 LogError( "[ClientPaho]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) ); 789 LogError( "[ClientPaho]", std::string( "onConnectSuccesOnInstance " + m_clientId + " - registering the connected callback failed with code : " + pahoAsyncErrorCodeToString(rc) ) );
810 } 790 }
  791 +
811 // For MQTTV5 792 // For MQTTV5
812 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect); 793 //rc = MQTTAsync_setDisconnected(m_client, this, &ClientPaho::onDisconnect);
813 //if (MQTTASYNC_SUCCESS != rc) { 794 //if (MQTTASYNC_SUCCESS != rc) {
814 // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc)); 795 // // ("ClientPaho", "onConnectSuccessOnInstance %1 - registering the disconnected callback failed with code %2", m_clientId, pahoAsyncErrorCodeToString(rc));
815 //} 796 //}
816 // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations); 797 // ("ClientPaho", "onConnectSuccessOnInstance %1 - pending operations : %2, removing operation -100", m_clientId, m_pendingOperations);
  798 +
817 m_operationResult[-100] = true; 799 m_operationResult[-100] = true;
818 m_pendingOperations.erase(-100); 800 m_pendingOperations.erase(-100);
819 } 801 }
@@ -1128,7 +1110,7 @@ void ClientPaho::onFirstConnect(void* context, char* cause) @@ -1128,7 +1110,7 @@ void ClientPaho::onFirstConnect(void* context, char* cause)
1128 { 1110 {
1129 auto *cl = reinterpret_cast<ClientPaho*>(context); 1111 auto *cl = reinterpret_cast<ClientPaho*>(context);
1130 std::string reason(nullptr == cause ? "Unknown cause" : cause); 1112 std::string reason(nullptr == cause ? "Unknown cause" : cause);
1131 - cl->pushIncomingEvent([cl, reason]() { cl->onFirstConnectOnInstance(reason); }); 1113 + cl->pushIncomingEvent([cl, reason]() { cl->onConnectSuccessOnInstance(); });
1132 } 1114 }
1133 } 1115 }
1134 1116
@@ -1146,7 +1128,7 @@ void ClientPaho::onConnect(void* context, char* cause) @@ -1146,7 +1128,7 @@ void ClientPaho::onConnect(void* context, char* cause)
1146 // static 1128 // static
1147 void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response) 1129 void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response)
1148 { 1130 {
1149 - LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSucces triggered.." ); 1131 + LogInfo( "[ClientPaho::onConnectSuccess]", "onConnectSuccess triggered.." );
1150 if (context) 1132 if (context)
1151 { 1133 {
1152 auto* cl = reinterpret_cast<ClientPaho*>(context); 1134 auto* cl = reinterpret_cast<ClientPaho*>(context);
@@ -1154,15 +1136,17 @@ void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response @@ -1154,15 +1136,17 @@ void ClientPaho::onConnectSuccess(void* context, MQTTAsync_successData* response
1154 { 1136 {
1155 // connect should always have a valid response struct. 1137 // connect should always have a valid response struct.
1156 LogError( "[ClientPaho]", "onConnectSuccess - no response data"); 1138 LogError( "[ClientPaho]", "onConnectSuccess - no response data");
  1139 + return;
1157 } 1140 }
1158 - MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));  
1159 - cl->pushIncomingEvent([cl, resp]() { cl->onConnectSuccessOnInstance(resp); }); 1141 + // MqttSuccess resp(response->token, ConnectionData(response->alt.connect.serverURI, response->alt.connect.MQTTVersion, response->alt.connect.sessionPresent));
  1142 + cl->pushIncomingEvent([cl]() { cl->onConnectSuccessOnInstance(); });
1160 } 1143 }
1161 } 1144 }
1162 1145
1163 // static 1146 // static
1164 void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response) 1147 void ClientPaho::onConnectFailure(void* context, MQTTAsync_failureData* response)
1165 { 1148 {
  1149 + LogDebug("[ClientPaho::onConnectFailure]", std::string( "Connection Failure?" ));
1166 if (context) 1150 if (context)
1167 { 1151 {
1168 auto* cl = reinterpret_cast<ClientPaho*>(context); 1152 auto* cl = reinterpret_cast<ClientPaho*>(context);