#pragma once #include #include #include "utils/log.h" extern "C" { #include "mqtt/MQTTClient.h" } namespace base { class MQTTClientCXX { public: /** * tcp://192.168.1.222:1883 * clientid */ MQTTClientCXX(const std::string &server_addr, const std::string &client_id): mqtt_client_(NULL), mqtt_conn_opts_(MQTTClient_connectOptions_initializer) { MQTTClient_create(&mqtt_client_, server_addr.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); mqtt_conn_opts_.keepAliveInterval = 20; mqtt_conn_opts_.cleansession = 1; mqtt_conn_opts_.username = "linux"; mqtt_conn_opts_.password = "linux_indoor_password."; delivered_token_ = 0; receiver_ = NULL; MQTTClient_setCallbacks(mqtt_client_, this, OnConnectionLost, OnMessageArrived, OnMessageDelivered); } ~MQTTClientCXX() { MQTTClient_destroy(&mqtt_client_); } //发布 bool Publish(const std::string& topic, const std::string &payload, int timeout_milliseconds) { MQTTClient_message pubmsg = MQTTClient_message_initializer; pubmsg.payload = (void*)payload.c_str(); pubmsg.payloadlen = payload.length(); pubmsg.qos = 1; pubmsg.retained = 0; int ret = 0; MQTTClient_deliveryToken token; MQTTClient_publishMessage(mqtt_client_, topic.c_str(), &pubmsg, &token); if ((ret = MQTTClient_waitForCompletion(mqtt_client_, token, timeout_milliseconds)) != MQTTCLIENT_SUCCESS) { LOGD("MQTTClient_waitForCompletion failed"); return false; } return true; } //消息传递时 static void OnMessageDelivered(void *context, MQTTClient_deliveryToken dt) { LOGD("Message with token value %d delivery confirmed\n", dt); MQTTClientCXX* client = (MQTTClientCXX*) context; client->delivered_token_ = dt; } //消息到达时 static int OnMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) { LOGE("topicName = %s, len = %d", topicName, topicLen); std::string payload((char*)message->payload, message->payloadlen); MQTTClientCXX* client = (MQTTClientCXX*) context; if (client->receiver_) { client->receiver_(client->user_param_, std::string(topicName), payload); } MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } //连接丢失 static void OnConnectionLost(void *context, char *cause) { LOGD("\nConnection lost\n"); LOGD(" cause: %s\n", cause); } typedef void (*MessageReceiver)(void* user_param, const std::string &topic, const std::string &payload); //设置消息接收者 void set_message_receiver(void* user_param, MessageReceiver receiver) { user_param_ = user_param; receiver_ = receiver; } //订阅 bool Subscribes(const std::string &topic, int qos) { int ret = MQTTClient_subscribe(mqtt_client_, topic.c_str(), qos); if (MQTTCLIENT_SUCCESS != ret) { LOGD("subscribe failed: %s", MQTTClient_strerror(ret)); return false; } return true; } //退订 bool Unsubscribes(const std::string &topic) { int ret = MQTTClient_unsubscribe(mqtt_client_, topic.c_str()); if (ret != MQTTCLIENT_SUCCESS) { LOGD("MQTTClient failed to unsubscribe : %s", MQTTClient_strerror(ret)); return false; } return true; } //已连接 bool Connected() { return MQTTClient_isConnected(mqtt_client_); } bool Connect() { int ret = 0; if ((ret = MQTTClient_connect(mqtt_client_, &mqtt_conn_opts_)) != MQTTCLIENT_SUCCESS) { LOGD("Failed to connect, return code %d", ret); return false; } return true; } //断开连接 void Disconnect(int timeout_milliseconds) { MQTTClient_disconnect(mqtt_client_, timeout_milliseconds); } private: MQTTClient mqtt_client_; MQTTClient_connectOptions mqtt_conn_opts_; MQTTClient_deliveryToken delivered_token_; void* user_param_; MessageReceiver receiver_; }; } /* namespace base */