在linux下使用c语言实现MQTT通信(二.编程实现)

it2022-05-06  1

我是用的secureCRT登录的树莓派,要实现MQTT通信,就需要用到许多关于MQTT的函数,这里我用的是Paho.c库,所以首先下载库: 在git下下载paho C库git clone https://github.com/eclipse/paho.mqtt.c.git

cd paho.mqtt.c make//编译 sudo make install//安装

执行make之后我们在build/output/下能看到以下动态库:

iot@Public_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_publish/paho.mqtt/build/output $ ls libpaho-mqtt3a.so libpaho-mqtt3as.so libpaho-mqtt3c.so libpaho-mqtt3cs.so paho_c_version libpaho-mqtt3a.so.1 libpaho-mqtt3as.so.1 libpaho-mqtt3c.so.1 libpaho-mqtt3cs.so.1 samples libpaho-mqtt3a.so.1.0 libpaho-mqtt3as.so.1.0 libpaho-mqtt3c.so.1.0 libpaho-mqtt3cs.so.1.0 test

在这里说一下这里面的各个动态库的作用: paho-mqtt3a : 一般实际开发中就是使用这个,a表示的是异步消息推送(asynchronous)。 paho-mqtt3as : as表示的是 异步+加密(asynchronous+OpenSSL)。 paho-mqtt3c : c 表示的应该是同步(Synchronize),一般性能较差,是发送+等待模式。 paho-mqtt3cs : cs表示的是同步+加密(asynchronous+OpenSSL)。 一丶发布端程序

get_temperature.c get_temperature.h get_time.c get_time.h opt_init.c opt_init.h publish.c set_signal.c set_signal.h

下面解释一下这些c代码: 1丶get_temperature.c就是发布端发布的消息,其内容是实验室树莓派获取的实时温度,其代码如下:

#include<stdio.h> #include<unistd.h> #include<dirent.h> #include<string.h> #include<sys/types.h> #include<errno.h> #include<sys/stat.h> #include<fcntl.h> #include<stdlib.h> #include"get_temperature.h" int get_temperature(float *temper) { char path[128]="/sys/bus/w1/devices/"; char path_s[32]; char buf[128]; char *ptr; DIR *dirp; int a=0; int fd; struct dirent *direntp; if((dirp=opendir(path))==NULL) { printf("Open %s failure:%s\n",path,strerror(errno)); return -1; } while((direntp=readdir(dirp))!=NULL) { if(strstr(direntp->d_name,"28-")) { strcpy(path_s,direntp->d_name); a=1; } } if(a==0) { printf("Can not find ds18b20 in %s\n",path); return -1; } strncat(path,path_s,sizeof(path)); strncat(path,"/w1_slave",sizeof(path)); if((fd=open(path,O_RDONLY))<0) { printf("Open %s failure:%s\n",path,strerror(errno)); return -1; } if(read(fd,buf,sizeof(buf))<0) { printf("read data from %s failure:%s\n",path,strerror(errno)); return -1; } ptr=strstr(buf,"t="); ptr+=2; if(!ptr) { printf("ERROR:%s\n",strerror(errno)); return 1000; } *temper=atof(ptr)/1000; return 0; }

2丶get_time.c:获取当前时间,网上有很多,我的示例代码如下:

1 #include<time.h> 2 #include<stdio.h> 3 #include<string.h> 4 #include"get_time.h" 5 int get_time(char time_s[24]) 6 { 7 time_t timec; 8 char *str; 9 time(&timec); 10 str=ctime(&timec); 11 char *chr=strchr(str,'\n'); 12 *chr='\0'; 13 strcpy(time_s,str); 14 return 0; 15 16 }

3丶opt_init.c:参数初始化函数。示例代码如下:

1 #include<stdio.h> 2 #include<getopt.h> 3 #include<unistd.h> 4 #include<string.h> 5 #include<stdlib.h> 6 #include"opt_init.h" 7 void usage(char *arg) 8 { 9 printf("%s usage:\n",arg); 10 puts("-p (--port)get the port"); 11 puts("-d (--daemon)run back"); 12 puts("-a (-address)the publish address"); 13 puts("-h (--help)get the help message"); 14 puts("-i (--id) get the publish id"); 15 puts("-t (--topic)the publish topic"); 16 return ; 17 } 18 19 int opt_init(int *port,char name[],char sub_id[],char topic[],int argc,char **argv) 20 { 21 struct option opts[]= 22 { 23 {"port",required_argument,NULL,'p'}, 24 {"daemon",no_argument,NULL,'d'}, 25 {"topic",required_argument,NULL,'t'}, 26 {"sub_id",required_argument,NULL,'i'}, 27 {"address",required_argument,NULL,'a'}, 28 {"help",no_argument,NULL,'h'}, 29 {NULL,0,NULL,0} 30 }; 31 int rv; 32 while((rv=getopt_long(argc,argv,"d:p:a:i:t:h",opts,NULL))!=-1) 33 { 34 switch(rv) 35 { 36 case 'd': 37 if(daemon(0,0)<0) 38 { 39 printf("daemon error\n"); 40 return 0; 41 } 42 case 'p': 43 *port=atoi(optarg); 44 break; 45 case 'a': 46 strcpy(name,optarg); 47 break; 48 case 'i': 49 strcpy(sub_id,optarg); 50 break; 51 case 't': 52 strcpy(topic,optarg); 53 break; 54 case 'h': 55 usage(argv[0]); 56 return 0; 57 default: 58 break; 59 } 60 } 61 if(!*port||!name) 62 { 63 usage(argv[0]); 64 return -1; 65 } 66 }

4丶set_signal.c:设置信号函数,示例代码如下:

1 #include<signal.h> 2 #include<stdio.h> 3 #include<stdlib.h> 4 #include"set_signal.h" 5 6 7 int g_stop=0; 8 void signal_action(int signum) 9 { 10 if(signum==SIGKILL) 11 { 12 g_stop=1; 13 printf("kill signal makes program ended\n"); 14 } 15 else if(signum==SIGINT) 16 { 17 g_stop=1; 18 printf("Ctrl+C signal makes program ended\n"); 19 } 20 exit(0); 21 } 22 23 int set_signal(void) 24 { 25 struct sigaction sigact; 26 sigemptyset(&sigact.sa_mask); 27 sigact.sa_flags=0; 28 sigact.sa_handler=signal_action; 29 30 sigaction(SIGKILL,&sigact,0); 31 sigaction(SIGINT,&sigact,0); 32 return 0; 33 }

5丶publish.c:发布端函数,示例代码如下:

1 #include<stdio.h> 2 #include<string.h> 3 #include<stdlib.h> 4 #include<errno.h> 5 #include<unistd.h> 6 #include"MQTTClient.h" 7 #include"set_signal.h" 8 #include"get_time.h" 9 #include"get_temperature.h" 10 #include"opt_init.h" 11 //此发布端不使用回调函数 12 int main(int argc,char **argv) 13 { 14 char address[128]; 15 int port=0; 16 char topic[128]; 17 char pub_id[128]; 18 char buf[128]={'\0'}; 19 char date[128]; 20 char address_s[128]; 21 float temper; 22 int rv; 23 const int qos=1; 24 const long timeout=10000L; 25 if(set_signal()<0) 26 { 27 printf("set_signal error:%s\n",strerror(errno)); 28 return -1; 29 } 30 if(opt_init(&port,address,pub_id,topic,argc,argv)<0) 31 { 32 printf("opt_init failure:%s\n",strerror(errno)); 33 return -1; 34 } 35 snprintf(address_s,sizeof(address_s),"tcp://%s:%d",address,port); 36 MQTTClient client; 37 MQTTClient_connectOptions conn_opts=MQTTClient_connectOptions_initializer; 38 MQTTClient_message publish_msg=MQTTClient_message_initializer; 39 MQTTClient_deliveryToken token; 40 conn_opts.keepAliveInterval=60; 41 conn_opts.cleansession=1; 42 MQTTClient_create(&client,address_s,pub_id,MQTTCLIENT_PERSISTENCE_NONE,NULL); 43 if((rv=MQTTClient_connect(client,&conn_opts))!=MQTTCLIENT_SUCCESS) 44 { 45 printf("MQTTClient_connect error:%s\n",strerror(errno)); 46 return -1; 47 } 48 publish_msg.qos=qos; 49 publish_msg.retained=0; 50 while(!g_stop) 51 { 52 if(get_time(date)<0) 53 { 54 printf("get_time error:%s\n",strerror(errno)); 55 return -1; 56 } 57 if(get_temperature(&temper)<0) 58 { 59 printf("get_temperature error:%s\n",strerror(errno)); 60 return -1; 61 } 62 snprintf(buf,sizeof(buf),"RPI0001/%s/%f",date,temper); 63 publish_msg.payload=(void *)buf; 64 publish_msg.payloadlen=strlen(buf); 65 MQTTClient_publishMessage(client,topic,&publish_msg,&token); 66 printf("Waiting for %d seconds for publication of---- %s---- on topic %s for subscriber with id:%s\n",timeout/1000,buf,topic,pub_id); 67 rv=MQTTClient_waitForCompletion(client,token,timeout); 68 printf("Message with delivery token %d delivered\n",rv); 69 sleep(30); 70 } 71 }

二丶订阅端程序:

db_init.h insert_db.h opt_init.c set_signal.c string_break.c subscribe.c db_init.c insert_db.c opt_init.h set_signal.h string_break.h

1丶db_init.c:数据库初始化函数,示例代码如下:

13 #include<stdio.h> 14 #include<sqlite3.h> 15 #include"db_init.h" 16 #include<stdlib.h> 17 int callback(void *Notused,int argc,char **argv,char **azColName) 18 { 19 int i=0; 20 for(i=0;i<argc;i++) 21 { 22 printf("%s=%s\n",azColName[i],argv[i]?argv[i]:"NULL"); 23 } 24 printf("\n"); 25 return 0; 26 } 27 28 int db_init(char *db_name,char *file_name) 29 { 30 sqlite3 *db; 31 char *zErrMsg=0; 32 int rc; 33 char sql[256]; 34 rc = sqlite3_open(db_name,&db); 35 if(rc!=0) 36 { 37 fprintf(stderr,"can't open database:%s\n",sqlite3_errmsg(db)); 38 return -1; 39 } 40 else 41 { 42 fprintf(stdout,"open database ok\n"); 43 } 44 snprintf(sql,256,"CREATE TABLE IF NOT EXISTS %s(" 45 "I INTEGER PRIMARY KEY AUTOINCREMENT ," 46 "ID CHAR(32) NOT NULL," 47 "DATE CHAR(48) NOT NULL," 48 "TEMPERATURE CHAR(32) NOT NULL);",file_name); 49 rc = sqlite3_exec(db,sql,callback,0,&zErrMsg); 50 if(rc!=SQLITE_OK) 51 { 52 fprintf(stderr,"SQL error:%s\n",zErrMsg); 53 sqlite3_free(zErrMsg); 54 return -1; 55 } 56 else 57 { 58 fprintf(stdout,"table create ok\n"); 59 } 60 sqlite3_close(db); 61 return 0; 62 }

2丶insert_db.c:插入数据库。示例代码如下:

13 #include<stdio.h> 14 #include<stdlib.h> 15 #include<sqlite3.h> 16 #include"insert_db.h" 17 int call_back(void *NotUsed,int argc , char **argv,char **azColName) 18 { 19 int i=0; 20 for(i=0;i<argc;i++) 21 { 22 printf("%s=%s\n",azColName[i],argv[i]?argv[i]:"NULL"); 23 } 24 printf("\n"); 25 return 0; 26 } 27 int insert_db(char *db_name,char *file_name,char *id,char *date,char *temper) 28 { 29 sqlite3 *db; 30 char *zErrMsg=0; 31 int rc ; 32 char sql[256]; 33 rc=sqlite3_open(db_name,&db); 34 if(rc!=0) 35 { 36 fprintf(stderr,"can't open database:%s\n",sqlite3_errmsg(db)); 37 return -1; 38 } 39 else 40 { 41 fprintf(stderr,"open database ok\n"); 42 } 43 snprintf(sql,256,"INSERT INTO %s(I,ID,DATE,TEMPERATURE)VALUES (NULL,'%s','%s','%s'); ",file_name,id,date,temper); 44 rc=sqlite3_exec(db,sql,call_back,0,&zErrMsg); 45 if(rc!=SQLITE_OK) 46 { 47 fprintf(stderr,"SQL error:%s\n",zErrMsg); 48 sqlite3_free(zErrMsg); 49 return -1; 50 } 51 else 52 { 53 fprintf(stdout,"records careated ok\n"); 54 } 55 sqlite3_close(db); 56 return 0; 57 }

3丶string_break.c:字符串分割,示例代码如下:

1 #include "Rectemper_server.h" 2 3 int string_break(char *buf, char *sn, char *time, char *temper) 4 { 5 char *p_head, *p_end ; 6 7 p_end = strstr(buf, "/") ; 8 strncpy(sn , buf, (p_end - buf)) ; 9 printf("sn: %s\n", sn) ; 10 11 p_head = p_end + 1 ; // p_head = "2019-1-23 14:17:27/15.375000C" 12 p_end = strstr( (p_end + 1), "/") ; // p_end= "/15.375000C" 13 strncpy(time, p_head, (p_end-p_head)); 14 printf("time: %s\n", time) ; 15 16 p_head = p_end + 1 ; // p_head ="15.375000C" 17 p_end = strstr(p_end, "C") ; 18 strncpy(temper, p_head, (p_end - p_head)) ; 19 printf("Temperature: %s\n", temper) ; 20 21 }

4丶subscribe.c:订阅端程序,示例代码如下:

1 #include<stdio.h> 2 #include<stdlib.h> 3 #include<string.h> 4 #include<errno.h> 5 #include"MQTTClient.h" 6 #include<unistd.h> 7 #include"set_signal.h" 8 #include"opt_init.h" 9 #include"db_init.h" 10 #include"insert_db.h" 11 #include"string_break.h" 12 volatile MQTTClient_deliveryToken deliveredtoken; 13 14 void delivered(void *context,MQTTClient_deliveryToken dt) 15 { 16 printf("Message with token value %d delivery confirmed\n",dt); 17 deliveredtoken=dt; 18 } 19 20 int msgarrvd(void *context,char *topicName,int topicLen,MQTTClient_message *message) 21 { 22 int i; 23 char buf[128]={'\0'}; 24 char id[128]; 25 char date[128]; 26 char temper[128]; 27 char *payloadptr; 28 payloadptr=message->payload; 29 printf("Message arrived \n"); 30 printf("topic:%s\n",topicName); 31 printf("Message:%s\n",message->payload); 32 printf("message:"); 33 for(i=0;i<message->payloadlen;i++) 34 { 35 buf[i]=*payloadptr; 36 putchar(*payloadptr++); 37 } 38 printf("%s\n",buf); 39 putchar('\n'); 40 if(string_break(id,date,temper,buf)<0) 41 { 42 printf("string_break failure:%s\n",strerror(errno)); 43 return -1; 44 } 45 if(db_init("Messages","message")<0) 46 { 47 printf("db_init failure:%s\n",strerror(errno)); 48 return -1; 49 } 50 if(insert_db("Messages","message",id,date,temper)<0) 51 { 52 printf("insert_db failure:%s\n",strerror(errno)); 53 return -1; 54 } 55 MQTTClient_freeMessage(&message); 56 MQTTClient_free(topicName); 57 return 1; 58 } 59 60 void connlost(void *context,char *cause) 61 { 62 printf("Connection lost\n"); 63 printf("cause %s\n",cause); 64 } 65 66 67 int main(int argc,char **argv) 68 { 69 MQTTClient client; 70 const int qos=1; 71 const long timeout=10000L; 72 char buf[128]; 73 int port=0; 74 char address[128]; 75 char sub_id[128]; 76 char topic[128]; 77 MQTTClient_connectOptions conn_opts=MQTTClient_connectOptions_initializer; 78 int rc,ch; 79 if(set_signal()<0) 80 { 81 printf("set_signal failure:%s\n",strerror(errno)); 82 return -1; 83 } 84 if(opt_init(&port,address,sub_id,topic,argc,argv)<0) 85 { 86 printf("opt_init failure:%s\n",strerror(errno)); 87 return 1; 88 } 89 snprintf(buf,sizeof(buf),"tcp://%s:%d",address,port); 90 MQTTClient_create(&client,buf,sub_id,MQTTCLIENT_PERSISTENCE_NONE,NULL); 91 conn_opts.keepAliveInterval=20; 92 conn_opts.cleansession=1; 93 MQTTClient_setCallbacks(client,NULL,connlost,msgarrvd,delivered); 94 95 if((rc=MQTTClient_connect(client,&conn_opts))!=MQTTCLIENT_SUCCESS) 96 { 97 printf("MQTTClient_connect failure:%s\n",strerror(errno)); 98 return -1; 99 } 100 printf("Subscribe to topic %s for client %s using QOS %d\n",topic,sub_id,qos); 101 MQTTClient_subscribe(client,topic,qos); 102 do 103 { 104 ch=getchar(); 105 }while(ch!='q'&&ch!='Q'); 106 MQTTClient_disconnect(client,10000); 107 MQTTClient_destroy(&client); 108 return rc; 109 }

set_signal.c与opt_init.c与publish端代码一样。 三丶发布端与订阅端实现通信 下面是发布端发布时间和温度:

iot@Public_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_publish $ ./a.out -p 1883 -a 127.0.0.1 -i 100 -t cwt Waiting for 10 seconds for publication of---- RPI0001/Mon Jul 22 16:36:48 2019/25.937000---- on topic cwt for subscriber with id:100 Message with delivery token 0 delivered Waiting for 10 seconds for publication of---- RPI0001/Mon Jul 22 16:37:19 2019/26.000000---- on topic cwt for subscriber with id:100 Message with delivery token 0 delivered

下面是订阅端收到时间和温度:

iot@Public_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_subscribe $ ./a.out -p 1883 -a 127.0.0.1 -t cwt Subscribe to topic cwt for client using QOS 1 Message arrived topic:cwt Message:RPI0001/Mon Jul 22 16:36:48 2019/25.937000 message:RPI0001/Mon Jul 22 16:36:48 2019/25.937000RPI0001/Mon Jul 22 16:36:48 2019/25.937000 open database ok table create ok open database ok records careated ok Message arrived topic:cwt Message:RPI0001/Mon Jul 22 16:37:19 2019/26.000000 message:RPI0001/Mon Jul 22 16:37:19 2019/26.000000RPI0001/Mon Jul 22 16:37:19 2019/26.000000 open database ok table create ok open database ok records careated ok

检查是否存入数据库中:

iot@Public_RPi:~/caiwentao/team_lihaojie/caiwentao/mqtt/mqtt_subscribe $ sqlite3 SQLite version 3.16.2 2017-01-06 16:32:41 Enter ".help" for usage hints. Connected to a transient in-memory database. Use ".open FILENAME" to reopen on a persistent database. sqlite> .open Messages sqlite> .table message sqlite> select * from message; 1|RPI0001|Mon Jul 22 16:36:48 20195.937000 2|RPI0001|Mon Jul 22 16:37:19 20196.000000

上图显示数据已经存入数据库中,可能由于显示问题后面的时间与温度数字出现重合,正确时间是2019年,温度是25.937000。 如上,成功实现MQTT通信。


最新回复(0)