mqtt_client_cxx.hpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #pragma once
  2. #include <unistd.h>
  3. #include <string>
  4. #include "utils/log.h"
  5. extern "C" {
  6. #include "mqtt/MQTTClient.h"
  7. }
  8. namespace base {
  9. class MQTTClientCXX {
  10. public:
  11. /**
  12. * tcp://192.168.1.222:1883
  13. * clientid
  14. */
  15. MQTTClientCXX(const std::string &server_addr, const std::string &client_id):
  16. mqtt_client_(NULL),
  17. mqtt_conn_opts_(MQTTClient_connectOptions_initializer) {
  18. MQTTClient_create(&mqtt_client_, server_addr.c_str(), client_id.c_str(),
  19. MQTTCLIENT_PERSISTENCE_NONE, NULL);
  20. mqtt_conn_opts_.keepAliveInterval = 20;
  21. mqtt_conn_opts_.cleansession = 1;
  22. mqtt_conn_opts_.username = "linux";
  23. mqtt_conn_opts_.password = "linux_indoor_password.";
  24. delivered_token_ = 0;
  25. receiver_ = NULL;
  26. MQTTClient_setCallbacks(mqtt_client_,
  27. this,
  28. OnConnectionLost,
  29. OnMessageArrived,
  30. OnMessageDelivered);
  31. }
  32. ~MQTTClientCXX() {
  33. MQTTClient_destroy(&mqtt_client_);
  34. }
  35. //发布
  36. bool Publish(const std::string& topic, const std::string &payload, int timeout_milliseconds) {
  37. MQTTClient_message pubmsg = MQTTClient_message_initializer;
  38. pubmsg.payload = (void*)payload.c_str();
  39. pubmsg.payloadlen = payload.length();
  40. pubmsg.qos = 1;
  41. pubmsg.retained = 0;
  42. int ret = 0;
  43. MQTTClient_deliveryToken token;
  44. MQTTClient_publishMessage(mqtt_client_, topic.c_str(), &pubmsg, &token);
  45. if ((ret = MQTTClient_waitForCompletion(mqtt_client_, token, timeout_milliseconds))
  46. != MQTTCLIENT_SUCCESS) {
  47. LOGD("MQTTClient_waitForCompletion failed");
  48. return false;
  49. }
  50. return true;
  51. }
  52. //消息传递时
  53. static void OnMessageDelivered(void *context, MQTTClient_deliveryToken dt)
  54. {
  55. LOGD("Message with token value %d delivery confirmed\n", dt);
  56. MQTTClientCXX* client = (MQTTClientCXX*) context;
  57. client->delivered_token_ = dt;
  58. }
  59. //消息到达时
  60. static int OnMessageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) {
  61. LOGE("topicName = %s, len = %d", topicName, topicLen);
  62. std::string payload((char*)message->payload, message->payloadlen);
  63. MQTTClientCXX* client = (MQTTClientCXX*) context;
  64. if (client->receiver_) {
  65. client->receiver_(client->user_param_, std::string(topicName), payload);
  66. }
  67. MQTTClient_freeMessage(&message);
  68. MQTTClient_free(topicName);
  69. return 1;
  70. }
  71. //连接丢失
  72. static void OnConnectionLost(void *context, char *cause) {
  73. LOGD("\nConnection lost\n");
  74. LOGD(" cause: %s\n", cause);
  75. }
  76. typedef void (*MessageReceiver)(void* user_param, const std::string &topic, const std::string &payload);
  77. //设置消息接收者
  78. void set_message_receiver(void* user_param, MessageReceiver receiver) {
  79. user_param_ = user_param;
  80. receiver_ = receiver;
  81. }
  82. //订阅
  83. bool Subscribes(const std::string &topic, int qos) {
  84. int ret = MQTTClient_subscribe(mqtt_client_, topic.c_str(), qos);
  85. if (MQTTCLIENT_SUCCESS != ret) {
  86. LOGD("subscribe failed: %s", MQTTClient_strerror(ret));
  87. return false;
  88. }
  89. return true;
  90. }
  91. //退订
  92. bool Unsubscribes(const std::string &topic) {
  93. int ret = MQTTClient_unsubscribe(mqtt_client_, topic.c_str());
  94. if (ret != MQTTCLIENT_SUCCESS) {
  95. LOGD("MQTTClient failed to unsubscribe : %s", MQTTClient_strerror(ret));
  96. return false;
  97. }
  98. return true;
  99. }
  100. //已连接
  101. bool Connected() {
  102. return MQTTClient_isConnected(mqtt_client_);
  103. }
  104. bool Connect() {
  105. int ret = 0;
  106. if ((ret = MQTTClient_connect(mqtt_client_, &mqtt_conn_opts_)) != MQTTCLIENT_SUCCESS) {
  107. LOGD("Failed to connect, return code %d", ret);
  108. return false;
  109. }
  110. return true;
  111. }
  112. //断开连接
  113. void Disconnect(int timeout_milliseconds) {
  114. MQTTClient_disconnect(mqtt_client_, timeout_milliseconds);
  115. }
  116. private:
  117. MQTTClient mqtt_client_;
  118. MQTTClient_connectOptions mqtt_conn_opts_;
  119. MQTTClient_deliveryToken delivered_token_;
  120. void* user_param_;
  121. MessageReceiver receiver_;
  122. };
  123. } /* namespace base */