Liunx下用C语言实现MQTT的接收与发送(下)

news/2024/7/8 11:47:25

继续上一篇关于MQTT的话题,本篇呢,我们将脱离paho的库环境,进行MQTT搭建。


环境准备

在paho.mqtt库中,简单翻阅一下,可以看到他的核心库的路径为 paho.mqtt.c-1.3.0\src

经过笔者的手撕源码,排除掉了许多冗余的部分,就只剩下列文件

将这些文件,笔者单独放到了“mqtt文件夹”内,做一个库封装起来。


封装

头文件太多太繁琐,因此需要个文件把它们集成起来。

mqtt_client.h:

#ifndef __MQTT_CLIENT_H__
#define __MQTT_CLIENT_H__

#include "MQTTClient.h"

#ifdef __cplusplus
extern "C" {
#endif

#define MQTT_SUCCESS   0
#define MQTT_FAILURE  -1
#define MQTT_DISCONNECTED -3
#define MQTT_MAX_MESSAGES_INFLIGHT -4
#define MQTT_BAD_UTF8_STRING -5
#define MQTT_NULL_PARAMETER -6
#define MQTT_TOPICNAME_TRUNCATED -7
#define MQTT_BAD_STRUCTURE -8
#define MQTT_BAD_QOS -9
#define QOS_AT_MOST_ONCE  0
#define QOS_AT_LEAST_ONCE 1
#define QOS_EXACTLY_ONCE  2
#define MQTT_PORT 1883
#define MQTT_DEFAULT_TIME_OUT  3000


/* MQTT client object*/
typedef struct _mqtt_client mqtt_client;

typedef int CALLBACK_MESSAGE_ARRIVED(mqtt_client *m, char *topic, char *data, int length);

/* structure of MQTT client object*/
struct _mqtt_client {
	MQTTClient client;
	//int Qos;     //Quality of service
	int timeout; //time out (milliseconds)
	CALLBACK_MESSAGE_ARRIVED *on_message_arrived;

	int    received_message_id;
	char * received_topic;
	char * received_message;
	int    received_message_len;
	int    received_topic_len;

	MQTTClient_message * received_msg;
};

mqtt_client * mqtt_new(char * host, int port, char *client_id);

int mqtt_delete(mqtt_client *m);

int mqtt_connect(mqtt_client * m, char *username, char *password);

int mqtt_disconnect(mqtt_client * m);

int mqtt_is_connected(mqtt_client *m);

void mqtt_yield(void);

int mqtt_set_timeout(mqtt_client *m, int timeout);

int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function);

int mqtt_subscribe(mqtt_client *m, char *topic, int Qos);

int mqtt_unsubscribe(mqtt_client *m, char *topic);

int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos);

int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos);

int mqtt_receive(mqtt_client *m, unsigned long timeout);

void mqtt_sleep(int milliseconds);


#ifdef __cplusplus
}
#endif

#endif /* __MQTT_CLIENT_H__ */

mqtt_client.c:

#include <stdlib.h>
#include <memory.h>
#include <string.h>
#include <errno.h>
#include "mqtt_client.h"
#include "MQTTClientPersistence.h"

mqtt_client * mqtt_new(char * host, int port, char *client_id)
{
	mqtt_client * m;
	int rc;

	m = malloc(sizeof(mqtt_client));
	if ( m != NULL) {
		memset(m , 0, sizeof(mqtt_client));
		rc = MQTTClient_create(&(m->client), host, client_id, MQTTCLIENT_PERSISTENCE_NONE, NULL);
		if ( rc == MQTTCLIENT_SUCCESS ) {
			m->timeout = MQTT_DEFAULT_TIME_OUT;
			m->received_msg = NULL;
			//mqtt_set_callback_message_arrived(m, m->on_message_arrived);
		} else {
			free(m);
			errno = rc;
			m = NULL;
			return NULL;
		}
	}
	return m;
}

int mqtt_connect(mqtt_client * m, char *username, char *password)
{
	int rc;
	MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

	if (!m) return -1;

	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;

	rc = MQTTClient_connect(m->client, &conn_opts);
	return rc;
}

int mqtt_disconnect(mqtt_client * m)
{
	if (!m) return -1;
	return MQTTClient_disconnect(m->client, 10000);
}

int mqtt_is_connected(mqtt_client *m)
{
	if (!m) return 0;
	return MQTTClient_isConnected(m->client);
}

void mqtt_yield(void)
{
	MQTTClient_yield();
}

int mqtt_set_timeout(mqtt_client *m, int timeout)
{
	if (!m) return -1;
	m->timeout = timeout;
	return MQTT_SUCCESS;
}

static int internal_callback_message_arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
	mqtt_client *m;

	m = (mqtt_client *)context;
	if (!m) return -1;
	if (m->on_message_arrived == NULL) return -1;

	if ( topicName[topicLen] != 0 )
		topicName[topicLen] = 0;

	return m->on_message_arrived(m, topicName, message->payload, message->payloadlen);
}

static void internal_callback_connectionLost(void *context, char *cause)
{
	return;
}

void internal_callback_delivery_complete(void *context, MQTTClient_deliveryToken dt)
{
	return;
}

int mqtt_set_callback_message_arrived(mqtt_client *m, CALLBACK_MESSAGE_ARRIVED * function)
{
	int ret;

	if (!m) return -1;
	m->on_message_arrived = function;
	ret = MQTTClient_setCallbacks(m->client, m,
		internal_callback_connectionLost,    //MQTTClient_connectionLost * 	cl,
		internal_callback_message_arrived,   //MQTTClient_messageArrived * 	ma,
		internal_callback_delivery_complete  //MQTTClient_deliveryComplete * 	dc
		);
	return ret;
}

int mqtt_subscribe(mqtt_client *m, char *topic, int Qos)
{
	if (!m) return -1;
	return MQTTClient_subscribe	(m->client, topic, Qos);
}

int mqtt_unsubscribe(mqtt_client *m, char *topic)
{
	if (!m) return -1;
	return MQTTClient_unsubscribe	(m->client, topic);
}

int mqtt_delete(mqtt_client *m)
{
	if (!m) return -1;
	MQTTClient_destroy(&(m->client));
	return 0;
}

int mqtt_publish_data(mqtt_client * m, char *topic, void *data, int length, int Qos)
{
	MQTTClient_message pubmsg = MQTTClient_message_initializer;
	MQTTClient_deliveryToken token = -1;
	int rc;

	if (!m) return -1;

	pubmsg.payload = data;
	pubmsg.payloadlen = length;
	pubmsg.qos = Qos;
	pubmsg.retained = 0;

	rc = MQTTClient_publishMessage(m->client, topic, &pubmsg, &token);
	if ( rc != MQTTCLIENT_SUCCESS )
		return rc;

	if ( m->timeout > 0 ) {
		rc = MQTTClient_waitForCompletion(m->client, token, m->timeout);
		if ( rc != MQTTCLIENT_SUCCESS )
			return rc;
		else
			return token;
	}

	return token;
}

int mqtt_publish(mqtt_client * m, char *topic, char *message, int Qos)
{
	return mqtt_publish_data(m, topic, message, strlen(message), Qos);
}

static void mqtt_clear_received(mqtt_client *m)
{
	if (!m) return;

	//MQTTClient_freeMessage();

	if ( m->received_msg != NULL ) {
		//free(m->received_msg->payload);
		//free(m->received_msg);
		m->received_msg = NULL;
		m->received_message = NULL;
		m->received_message_len = 0;
		m->received_message_id = 0;
	}

	/*
	if ( m->received_topic != NULL) {
		free(m->received_topic);
		m->received_topic = NULL;
		m->received_topic_len = 0;
	}
	*/

}

int mqtt_receive(mqtt_client *m, unsigned long timeout)
{
	int rc;

	if (!m) return -1;

	mqtt_clear_received(m);

	rc = MQTTClient_receive(m->client, &(m->received_topic),
			&(m->received_topic_len), &(m->received_msg), timeout);

	if ( rc == MQTTCLIENT_SUCCESS || rc == MQTTCLIENT_TOPICNAME_TRUNCATED ) {
		if ( m->received_msg == NULL) {
			rc = -1;
		} else {
			rc = MQTTCLIENT_SUCCESS;
			if ( m->received_topic[m->received_topic_len] != 0 )
				m->received_topic[m->received_topic_len] = 0;
			m->received_message = m->received_msg->payload;
			m->received_message_len = m->received_msg->payloadlen;
			m->received_message_id = m->received_msg->msgid;
		}
	}

	return rc;
}

void mqtt_sleep(int milliseconds)
{
	MQTTClient_sleep(milliseconds);
}

函数命名都是根据功能来的,所以笔者也没多打注释了。

把这两个文件也放到mqtt文件夹内。


MQTT数据订阅端

废话不多说,先上代码为敬:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include "mqtt/mqtt_client.h"

int running = 1;

void stop_running(int sig)
{
	signal(SIGINT, NULL);
	running = 0;
}


int main(int argc, char ** argv) {

	mqtt_client *m; //mqtt_client 对象指针
	int ret; //返回值
	char *host = "localhost:1883";//测试服务器
	char *topic = "MQTT/test"; //主题
	char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
	char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
	char *password = NULL;//密码,用于验证身份。对测试服务器,无。
	int Qos; //Quality of Service

	//create new mqtt client object
	m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
	if ( m == NULL ) {
		printf("mqtt client create failure, return  code = %d\n", errno);
		return 1;
	} else {
		printf("mqtt client created\n");
	}

	//connect to server
	ret = mqtt_connect(m, username, password); //连接服务器
	if (ret != MQTT_SUCCESS ) {
		printf("mqtt client connect failure, return code = %d\n", ret);
		return 1;
	} else {
		printf("mqtt client connect\n");
	}

	//subscribe
	Qos = QOS_EXACTLY_ONCE;
	ret = mqtt_subscribe(m, topic, Qos);//订阅消息
	printf("mqtt client subscribe %s,  return code = %d\n", topic, ret);

	signal(SIGINT, stop_running);
	signal(SIGTERM, stop_running);

	printf("wait for message of topic: %s ...\n", topic);

	//loop: waiting message
	while (running) {
		int timeout = 200;
		if ( mqtt_receive(m, timeout) == MQTT_SUCCESS ) { //recieve message,接收消息
			printf("received Topic=%s, Message=%s\n", m->received_topic, m->received_message);
		}
		mqtt_sleep(200); //sleep a while
	}

	mqtt_disconnect(m); //disconnect
	printf("mqtt client disconnect");
	mqtt_delete(m);  //delete mqtt client object
	return 0;
}

可以看到,订阅消息是由mqtt_subscribe()函数完成,要订阅多个消息,只需要反复调用这个函数就行了。

要做收发信息处理,就在while(running)里面strump就行了,这个比较简单,就自己根据情况倒腾吧。


MQTT数据发布端

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include "mqtt/mqtt_client.h"

int main(int argc, char ** argv) {
	mqtt_client *m; //mqtt_client 对象指针
	int ret; //返回值
	char *host = "localhost:1883";//测试服务器
	char *topic = "MQTT/test"; //主题
	char *client_id = "fs1704";//客户端ID; 对测试服务器,可以随便写
	char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
	char *password = NULL;//密码,用于验证身份。对测试服务器,无。
	int Qos; //Quality of Service

	//create new mqtt client object
	m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
	if ( m == NULL ) {
		printf("mqtt client create failure, return  code = %d\n", errno);
		return 1;
	} else {
		printf("mqtt client created\n");
	}

	//connect to server
	ret = mqtt_connect(m, username, password); //连接服务器
	if (ret != MQTT_SUCCESS ) {
		printf("mqtt client connect failure, return code = %d\n", ret);
		return 1;
	} else {
		printf("mqtt client connect\n");
	}

	//publish message
	Qos = QOS_EXACTLY_ONCE; //Qos
	ret = mqtt_publish(m, topic, "This is MQTT publicer", Qos);//发布消息
	printf("mqtt client publish,  return code = %d\n", ret);

	mqtt_disconnect(m); //disconnect
	mqtt_delete(m);  //delete mqtt client object
	return 0;
}

可以看到发布话题是通过mqtt_publish函数实现的。具体内容看上面封装的函数库吧。


测试结果

笔者通过Jetson Nano亲测成功,因此,笔者更希望读者们能自行完成这些操作,这样可能会对自身学习更好一点。

咱也就不放图了,看源码应该就能想到会产生怎样的画面了吧。


闲谈

笔者辛辛苦苦的手撕paho.MQTT库,因为我的项目内容需要用到。但是当我测试完后,要把这个库移植到我项目里面,发现。我的项目环境是C++,而这版本的是C语言。C语言下不能调用带有C++语法的函数(命名空间类型的)。因此,笔者踏上了怎么用C++来套这库的路上。

本篇文章,最终代码见(5积分,买不了吃亏,买不了上当,绝对值了):https://download.csdn.net/download/qq_25662827/77129677https://download.csdn.net/download/qq_25662827/77129677


http://www.niftyadmin.cn/n/2278872.html

相关文章

Liunx下用C++实现MQTT的接收与发送

继续上次《Liunx下用C语言实现MQTT的接收与发送&#xff08;上&#xff09;&#xff08;下&#xff09;》&#xff0c;本人探寻了如何基于paho.mqtt.c库开发C版本的MQTT。事实上&#xff0c;paho官网有paho.mqtt.c库&#xff0c;不过笔者进去后玩不怎么转&#xff0c;可能是笔者…

Linux下使用C/C++获取可用串口

最近在Jetson Nano上发现个问题&#xff0c;插多个USB-TTL后&#xff0c;串口号会发生变化。看来笔者之前 C/C语言读写UART或USB串口数据https://blog.csdn.net/qq_25662827/article/details/122581819的同学应该都知道&#xff0c;笔者在代码中是将端口号写死了的。 面对多变…

面向APP抓包方法

最近呢&#xff0c;准备做个自动打卡的东西&#xff0c;不过呢&#xff0c;那APP是通过H5实现的。于是乎。就打算采用个蠢办法。 就是抓包&#xff0c;通过解析抓包&#xff0c;将包发过去就可以实现打卡了撒。 准备工作 本教程采用Fiddler软件进行抓包。在各大应用商城APP里应…

winliunx下C语言获取时间戳方法(秒级,毫秒级)

最近呢&#xff0c;项目需要获取时间戳&#xff0c;作为数据包中的内容。因此才有了此篇。 我们通常所用的时间戳呢&#xff0c;是Unix时间戳。Unix时间戳是从1970年1月1日&#xff08;UTC/GMT的午夜&#xff09;开始所经过的秒数&#xff0c;不考虑闰秒。 有个实用的时间戳获…

面向C/C++的json解析和合成的第三方库cJson

我们在项目中经常要用到json格式数据进行通讯&#xff0c;特别是还要做ARM开发板上实现&#xff0c;处理JSON&#xff0c;自己手撕数据处理是件麻烦的事情&#xff0c;不过现在我们有第三方库了&#xff01;那就是cJson&#xff01; 环境准备 cJson官方库&#xff1a; https:/…

C++ STL list详解

写这个博客专栏就是为了以后备用。 简介 list是一种序列式容器。list容器完成的功能实际上和数据结构中的双向链表是极其相似的&#xff0c;list中的数据元素是通过链表指针串连成逻辑意义上的线性表&#xff0c;list不仅是一个双向链表&#xff0c;而其还是一个环状双向链表。…

C++ STL vector详解

写这个博客专栏就是为了以后备用。 简介 vector(向量):是一种顺序容器&#xff0c;事实上和数组差不多&#xff0c;但它比数组更优越。一般来说数组不能动态拓展&#xff0c;因此在程序运行的时候不是浪费内存&#xff0c;就是造成越界。而vector正好弥补了这个缺陷&#xff0c…

C++ STL map详解

写这个博客专栏就是为了以后备用。 简介 Map是STL的一个关联容器&#xff0c;它提供一对一&#xff08;其中第一个可以称为关键字&#xff0c;每个关键字只能在map中出现一次&#xff0c;第二个可能称为该关键字的值&#xff09;的数据处理能力&#xff0c;由于这个特性&#…