123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- #pragma once
- #include <unistd.h>
- #include <string>
- #include "utils/log.h"
- extern "C" {
- #include "mqtt/MQTTClient.h"
- }
- namespace base {
- class MQTTClientCXX {
- public:
- /**
- * tcp://192.168.1.222:1883
- * clientid
- */
- MQTTClientCXX(const std::string &server_addr, const std::string &client_id):
- mqtt_client_(NULL),
- mqtt_conn_opts_(MQTTClient_connectOptions_initializer) {
- MQTTClient_create(&mqtt_client_, server_addr.c_str(), client_id.c_str(),
- MQTTCLIENT_PERSISTENCE_NONE, NULL);
- mqtt_conn_opts_.keepAliveInterval = 20;
- mqtt_conn_opts_.cleansession = 1;
- mqtt_conn_opts_.username = "linux";
- mqtt_conn_opts_.password = "linux_indoor_password.";
- delivered_token_ = 0;
- receiver_ = NULL;
- MQTTClient_setCallbacks(mqtt_client_,
- this,
- OnConnectionLost,
- OnMessageArrived,
- OnMessageDelivered);
- }
- ~MQTTClientCXX() {
- MQTTClient_destroy(&mqtt_client_);
- }
- //发布
- bool Publish(const std::string& topic, const std::string &payload, int timeout_milliseconds) {
- MQTTClient_message pubmsg = MQTTClient_message_initializer;
- pubmsg.payload = (void*)payload.c_str();
- pubmsg.payloadlen = payload.length();
- pubmsg.qos = 1;
- pubmsg.retained = 0;
- int ret = 0;
- MQTTClient_deliveryToken token;
- MQTTClient_publishMessage(mqtt_client_, topic.c_str(), &pubmsg, &token);
- if ((ret = MQTTClient_waitForCompletion(mqtt_client_, token, timeout_milliseconds))
- != MQTTCLIENT_SUCCESS) {
- LOGD("MQTTClient_waitForCompletion failed");
- return false;
- }
- return true;
- }
- //消息传递时
- static void OnMessageDelivered(void *context, MQTTClient_deliveryToken dt)
- {
- LOGD("Message with token value %d delivery confirmed\n", dt);
- MQTTClientCXX* client = (MQTTClientCXX*) context;
- client->delivered_token_ = dt;
- }
- //消息到达时
- static int OnMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
- LOGE("topicName = %s, len = %d", topicName, topicLen);
- std::string payload((char*)message->payload, message->payloadlen);
- MQTTClientCXX* client = (MQTTClientCXX*) context;
- if (client->receiver_) {
- client->receiver_(client->user_param_, std::string(topicName), payload);
- }
- MQTTClient_freeMessage(&message);
- MQTTClient_free(topicName);
- return 1;
- }
- //连接丢失
- static void OnConnectionLost(void *context, char *cause) {
- LOGD("\nConnection lost\n");
- LOGD(" cause: %s\n", cause);
- }
- typedef void (*MessageReceiver)(void* user_param, const std::string &topic, const std::string &payload);
- //设置消息接收者
- void set_message_receiver(void* user_param, MessageReceiver receiver) {
- user_param_ = user_param;
- receiver_ = receiver;
- }
- //订阅
- bool Subscribes(const std::string &topic, int qos) {
- int ret = MQTTClient_subscribe(mqtt_client_, topic.c_str(), qos);
- if (MQTTCLIENT_SUCCESS != ret) {
- LOGD("subscribe failed: %s", MQTTClient_strerror(ret));
- return false;
- }
- return true;
- }
- //退订
- bool Unsubscribes(const std::string &topic) {
- int ret = MQTTClient_unsubscribe(mqtt_client_, topic.c_str());
- if (ret != MQTTCLIENT_SUCCESS) {
- LOGD("MQTTClient failed to unsubscribe : %s", MQTTClient_strerror(ret));
- return false;
- }
- return true;
- }
- //已连接
- bool Connected() {
- return MQTTClient_isConnected(mqtt_client_);
- }
- bool Connect() {
- int ret = 0;
- if ((ret = MQTTClient_connect(mqtt_client_, &mqtt_conn_opts_)) != MQTTCLIENT_SUCCESS) {
- LOGD("Failed to connect, return code %d", ret);
- return false;
- }
- return true;
- }
- //断开连接
- void Disconnect(int timeout_milliseconds) {
- MQTTClient_disconnect(mqtt_client_, timeout_milliseconds);
- }
- private:
- MQTTClient mqtt_client_;
- MQTTClient_connectOptions mqtt_conn_opts_;
- MQTTClient_deliveryToken delivered_token_;
- void* user_param_;
- MessageReceiver receiver_;
- };
- } /* namespace base */
|