继续上一篇关于MQTT的话题,本篇呢,我们将脱离paho的库环境,进行MQTT搭建。
环境准备
在paho.mqtt库中,简单翻阅一下,可以看到他的核心库的路径为 paho.mqtt.c-1.3.0\src
经过笔者的手撕源码,排除掉了许多冗余的部分,就只剩下列文件
将这些文件,笔者单独放到了“mqtt文件夹”内,做一个库封装起来。
封装
头文件太多太繁琐,因此需要个文件把它们集成起来。
mqtt_client.h:
#ifndef __MQTT_CLIENT_H__
#define __MQTT_CLIENT_H__
#include "MQTTClient.h"
#ifdef __cplusplus
extern "C" {
#endif
#define MQTT_SUCCESS 0
#define MQTT_FAILURE -1
#define MQTT_DISCONNECTED -3
#define MQTT_MAX_MESSAGES_INFLIGHT -4
#define MQTT_BAD_UTF8_STRING -5
#define MQTT_NULL_PARAMETER -6
#define MQTT_TOPICNAME_TRUNCATED -7
#define MQTT_BAD_STRUCTURE -8
#define MQTT_BAD_QOS -9
#define QOS_AT_MOST_ONCE 0
#define QOS_AT_LEAST_ONCE 1
#define QOS_EXACTLY_ONCE 2
#define MQTT_PORT 1883
#define MQTT_DEFAULT_TIME_OUT 3000
/* MQTT client object*/
typedef struct _mqtt_client mqtt_client;
typedef int CALLBACK_MESSAGE_ARRIVED(mqtt_client *m, char *topic, char *data, int length);
/* structure of MQTT client object*/
struct _mqtt_client {
MQTTClient client;
//int Qos; //Quality of service
int timeout; //time out (milliseconds)
CALLBACK_MESSAGE_ARRIVED *on_message_arrived;
int received_message_id;
char * received_topic;
char * received_message;
int received_message_len;
int received_topic_len;
MQTTClient_message * received_msg;
};
mqtt_client * mqtt_new(char * host, int port, char *client_id);
int mqtt_delete(mqtt_client *m);
int mqtt_connect(mqtt_client * m, char *username, char *password);
int mqtt_disconnect(mqtt_client * m);
int mqtt_is_connected(mqtt_client *m);
void mqtt_yield(void);
int mqtt_set_timeout(mqtt_client *m, int timeout);
int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function);
int mqtt_subscribe(mqtt_client *m, char *topic, int Qos);
int mqtt_unsubscribe(mqtt_client *m, char *topic);
int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos);
int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos);
int mqtt_receive(mqtt_client *m, unsigned long timeout);
void mqtt_sleep(int milliseconds);
#ifdef __cplusplus
}
#endif
#endif /* __MQTT_CLIENT_H__ */
mqtt_client.c:
#include <stdlib.h>
#include <memory.h>
#include <string.h>
#include <errno.h>
#include "mqtt_client.h"
#include "MQTTClientPersistence.h"
mqtt_client * mqtt_new(char * host, int port, char *client_id)
{
mqtt_client * m;
int rc;
m = malloc(sizeof(mqtt_client));
if ( m != NULL) {
memset(m , 0, sizeof(mqtt_client));
rc = MQTTClient_create(&(m->client), host, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
if ( rc == MQTTCLIENT_SUCCESS ) {
m->timeout = MQTT_DEFAULT_TIME_OUT;
m->received_msg = NULL;
//mqtt_set_callback_message_arrived(m, m->on_message_arrived);
} else {
free(m);
errno = rc;
m = NULL;
return NULL;
}
}
return m;
}
int mqtt_connect(mqtt_client * m, char *username, char *password)
{
int rc;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
if (!m) return -1;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
rc = MQTTClient_connect(m->client, &conn_opts);
return rc;
}
int mqtt_disconnect(mqtt_client * m)
{
if (!m) return -1;
return MQTTClient_disconnect(m->client, 10000);
}
int mqtt_is_connected(mqtt_client *m)
{
if (!m) return 0;
return MQTTClient_isConnected(m->client);
}
void mqtt_yield(void)
{
MQTTClient_yield();
}
int mqtt_set_timeout(mqtt_client *m, int timeout)
{
if (!m) return -1;
m->timeout = timeout;
return MQTT_SUCCESS;
}
static int internal_callback_message_arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
mqtt_client *m;
m = (mqtt_client *)context;
if (!m) return -1;
if (m->on_message_arrived == NULL) return -1;
if ( topicName[topicLen] != 0 )
topicName[topicLen] = 0;
return m->on_message_arrived(m, topicName, message->payload, message->payloadlen);
}
static void internal_callback_connectionLost(void *context, char *cause)
{
return;
}
void internal_callback_delivery_complete(void *context, MQTTClient_deliveryToken dt)
{
return;
}
int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function)
{
int ret;
if (!m) return -1;
m->on_message_arrived = function;
ret = MQTTClient_setCallbacks(m->client, m,
internal_callback_connectionLost, //MQTTClient_connectionLost * cl,
internal_callback_message_arrived, //MQTTClient_messageArrived * ma,
internal_callback_delivery_complete //MQTTClient_deliveryComplete * dc
);
return ret;
}
int mqtt_subscribe(mqtt_client *m, char *topic, int Qos)
{
if (!m) return -1;
return MQTTClient_subscribe (m->client, topic, Qos);
}
int mqtt_unsubscribe(mqtt_client *m, char *topic)
{
if (!m) return -1;
return MQTTClient_unsubscribe (m->client, topic);
}
int mqtt_delete(mqtt_client *m)
{
if (!m) return -1;
MQTTClient_destroy(&(m->client));
return 0;
}
int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos)
{
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token = -1;
int rc;
if (!m) return -1;
pubmsg.payload = data;
pubmsg.payloadlen = length;
pubmsg.qos = Qos;
pubmsg.retained = 0;
rc = MQTTClient_publishMessage(m->client, topic, &pubmsg, &token);
if ( rc != MQTTCLIENT_SUCCESS )
return rc;
if ( m->timeout > 0 ) {
rc = MQTTClient_waitForCompletion(m->client, token, m->timeout);
if ( rc != MQTTCLIENT_SUCCESS )
return rc;
else
return token;
}
return token;
}
int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos)
{
return mqtt_publish_data(m, topic, message, strlen(message), Qos);
}
static void mqtt_clear_received(mqtt_client *m)
{
if (!m) return;
//MQTTClient_freeMessage();
if ( m->received_msg != NULL ) {
//free(m->received_msg->payload);
//free(m->received_msg);
m->received_msg = NULL;
m->received_message = NULL;
m->received_message_len = 0;
m->received_message_id = 0;
}
/*
if ( m->received_topic != NULL) {
free(m->received_topic);
m->received_topic = NULL;
m->received_topic_len = 0;
}
*/
}
int mqtt_receive(mqtt_client *m, unsigned long timeout)
{
int rc;
if (!m) return -1;
mqtt_clear_received(m);
rc = MQTTClient_receive(m->client, &(m->received_topic),
&(m->received_topic_len), &(m->received_msg), timeout);
if ( rc == MQTTCLIENT_SUCCESS || rc == MQTTCLIENT_TOPICNAME_TRUNCATED ) {
if ( m->received_msg == NULL) {
rc = -1;
} else {
rc = MQTTCLIENT_SUCCESS;
if ( m->received_topic[m->received_topic_len] != 0 )
m->received_topic[m->received_topic_len] = 0;
m->received_message = m->received_msg->payload;
m->received_message_len = m->received_msg->payloadlen;
m->received_message_id = m->received_msg->msgid;
}
}
return rc;
}
void mqtt_sleep(int milliseconds)
{
MQTTClient_sleep(milliseconds);
}
函数命名都是根据功能来的,所以笔者也没多打注释了。
把这两个文件也放到mqtt文件夹内。
MQTT数据订阅端
废话不多说,先上代码为敬:
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include "mqtt/mqtt_client.h"
int running = 1;
void stop_running(int sig)
{
signal(SIGINT, NULL);
running = 0;
}
int main(int argc, char ** argv) {
mqtt_client *m; //mqtt_client 对象指针
int ret; //返回值
char *host = "localhost:1883";//测试服务器
char *topic = "MQTT/test"; //主题
char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
char *password = NULL;//密码,用于验证身份。对测试服务器,无。
int Qos; //Quality of Service
//create new mqtt client object
m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
if ( m == NULL ) {
printf("mqtt client create failure, return code = %d\n", errno);
return 1;
} else {
printf("mqtt client created\n");
}
//connect to server
ret = mqtt_connect(m, username, password); //连接服务器
if (ret != MQTT_SUCCESS ) {
printf("mqtt client connect failure, return code = %d\n", ret);
return 1;
} else {
printf("mqtt client connect\n");
}
//subscribe
Qos = QOS_EXACTLY_ONCE;
ret = mqtt_subscribe(m, topic, Qos);//订阅消息
printf("mqtt client subscribe %s, return code = %d\n", topic, ret);
signal(SIGINT, stop_running);
signal(SIGTERM, stop_running);
printf("wait for message of topic: %s ...\n", topic);
//loop: waiting message
while (running) {
int timeout = 200;
if ( mqtt_receive(m, timeout) == MQTT_SUCCESS ) { //recieve message,接收消息
printf("received Topic=%s, Message=%s\n", m->received_topic, m->received_message);
}
mqtt_sleep(200); //sleep a while
}
mqtt_disconnect(m); //disconnect
printf("mqtt client disconnect");
mqtt_delete(m); //delete mqtt client object
return 0;
}
可以看到,订阅消息是由mqtt_subscribe()函数完成,要订阅多个消息,只需要反复调用这个函数就行了。
要做收发信息处理,就在while(running)里面strump就行了,这个比较简单,就自己根据情况倒腾吧。
MQTT数据发布端
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include "mqtt/mqtt_client.h"
int main(int argc, char ** argv) {
mqtt_client *m; //mqtt_client 对象指针
int ret; //返回值
char *host = "localhost:1883";//测试服务器
char *topic = "MQTT/test"; //主题
char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
char *password = NULL;//密码,用于验证身份。对测试服务器,无。
int Qos; //Quality of Service
//create new mqtt client object
m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
if ( m == NULL ) {
printf("mqtt client create failure, return code = %d\n", errno);
return 1;
} else {
printf("mqtt client created\n");
}
//connect to server
ret = mqtt_connect(m, username, password); //连接服务器
if (ret != MQTT_SUCCESS ) {
printf("mqtt client connect failure, return code = %d\n", ret);
return 1;
} else {
printf("mqtt client connect\n");
}
//publish message
Qos = QOS_EXACTLY_ONCE; //Qos
ret = mqtt_publish(m, topic, "This is MQTT publicer", Qos);//发布消息
printf("mqtt client publish, return code = %d\n", ret);
mqtt_disconnect(m); //disconnect
mqtt_delete(m); //delete mqtt client object
return 0;
}
可以看到发布话题是通过mqtt_publish函数实现的。具体内容看上面封装的函数库吧。
测试结果
笔者通过Jetson Nano亲测成功,因此,笔者更希望读者们能自行完成这些操作,这样可能会对自身学习更好一点。
咱也就不放图了,看源码应该就能想到会产生怎样的画面了吧。
闲谈
笔者辛辛苦苦的手撕paho.MQTT库,因为我的项目内容需要用到。但是当我测试完后,要把这个库移植到我项目里面,发现。我的项目环境是C++,而这版本的是C语言。C语言下不能调用带有C++语法的函数(命名空间类型的)。因此,笔者踏上了怎么用C++来套这库的路上。
本篇文章,最终代码见(5积分,买不了吃亏,买不了上当,绝对值了):https://download.csdn.net/download/qq_25662827/77129677https://download.csdn.net/download/qq_25662827/77129677