为客户端设置MQTTAsync_connected()回调函数
DLLExport int MQTTAsync_setConnected(MQTTAsync handle, void* context, MQTTAsync_connected* co);
其中回调函数的原型
typedef void MQTTAsync_connected(void* context, char* cause);
此函数尝试将客户端订阅到单个主题
DLLExport int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);
订阅端:MQTTAsync_subscribe.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int disc_finished = 0;
int subsribed = 0;
int finished = 0;
void connlost(void *context, char *cause)
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnect lost\n");
if(cause)
printf("cause: %s\n", cause);
printf("Reconneting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if((rc = MQTTAsync_connect(client, &conn_opts))!= MQTTASYNC_SUCCESS)
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
int msgarrvd(void *context, char *topicName, int topiclen, MQTTAsync_message *message)
int i;
char *payloadptr;
printf("Message arrived\n");
printf("topic: %s\n", topicName);
printf("message:");
payloadptr = message->payload;
for(i = 0; i < message->payloadlen; i++)
putchar(*payloadptr++);
putchar('\n');
MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName);
return 1;
void onDisconnet(void *context, MQTTAsync_successData *response)
printf("Successful disconnection\n");
disc_finished = 1;
void onSubscribe(void *context, MQTTAsync_successData *response)
printf("Subsrive successed\n");
subsribed = 1;
void onSubsribeFailure(void *context, MQTTAsync_failureData *response)
printf("Subscribe failed, rc %d\n", response ? response->code : 0);
finished = 1;
void onConnectFailure(void *context, MQTTAsync_failureData *response)
printf("Connect failed, rc %d\n", response ? response->code : 0);
finished = 1;
void onConnect(void *context, MQTTAsync_successData *response)
MQTTAsync client= (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc;
printf("Successful connection\n");
printf("Subscruibing to topic %s\n for client %s\n using Qos %d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
opts.onSuccess = onSubscribe;
opts.onFailure = onSubsribeFailure;
opts.context = client;
deliveredtoken = 0;
if((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
printf("Failed to start subsribe, return code %d\n", rc);
exit(EXIT_FAILURE);
int main(int argc, char* argv[])
int rc;
int ch;
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, client, connlost, msgarrvd, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
while(!subsribed)
sleep(3);
if(finished)
goto exit;
ch = getchar();
} while(ch != 'Q' && ch != 'q');
disc_opts.onSuccess = onDisconnet;
if((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
while(!disc_finished)
sleep(3);
exit:
MQTTAsync_destroy(&client);
return rc;
发布端:MQTTAsync_publish.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "MQTT Examples"
#define PAYLOAD "Hello World"
#define QOS 1
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
int finished = 0;
void connlost(void *context, char *cause)
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
int rc;
printf("\nConnect lost\n");
printf("cause: %s\n", cause);
printf("Reconnecting\n");
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
printf("Failed to start connect, return code %d\n", rc);
finished = 1;
void onDisconnect(void *context, MQTTAsync_successData *response)
printf("Successful disconnection\n");
finished = 1;
void onSend(void *context, MQTTAsync_successData *response)
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
int rc;
printf("Message with token value %d delivery confirmed\n", response->token);
opts.onSuccess = onDisconnect;
opts.context = client;
if((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
void onConnectFailure(void *context, MQTTAsync_failureData *response)
printf("Connect failed, rc %d\n", response? response->code : 0);
finished = 1;
void onConnect(void *context, MQTTAsync_successData *response)
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc;
printf("Successful connection\n");
opts.onSuccess = onSend;
opts.context = client;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = (int)strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
deliveredtoken = 0;
if((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
printf("Failed to start sendMessage, return code %d\n", rc);
exit(EXIT_FAILURE);
int main(int argc, char* argv[])
int rc;
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTAsync_setCallbacks(client, NULL, connlost, NULL, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
if((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
printf("Waiting for publication of %s \n"
"on topic %s for client with ClientID %s\n",
PAYLOAD, TOPIC, CLIENTID);
while(!finished)
sleep(4);
MQTTAsync_destroy(&client);
return rc;
源码路径:https://download.csdn.net/download/weixin_36209467/13070777
文章目录使用MQTTAsync_sendMessage发送消息时产生Segement FaultSeg Fault信息第一种Segement Fault情况 SIGSEGV第二种Segement Fault情况 SIGABRT解决方法参考
使用MQTTAsync_sendMessage发送消息时产生Segement Fault
在paho.mqtt.c的1.3.8版本之前(我是出现在1.3.4中),使用MQTTAsync_sendMessage发送大量数据会产生Segement Fault错误,
Seg F
至于怎么连接,订阅,发布的,这里就不重复说了,网上很多,详细看官方文档,
https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/struct_m_q_t_t_async__connect_options.html#a7902ce4d11b96d8b19582bdd1f82b630
1,特别说明,重连...
//MQTT注册(连接)
bool RegisterMQTT(const std::string& user, const std::string& pwd, const std::string& serviceIp, const std::string& source)
int rc = -1;
rc = MQTTAsync_create(&m_client, serviceIp.c_str(), source.c_str(), MQTTCLIENT_PERS
同步的概念
同步,我的理解是一种线性执行的方式,执行的流程不能跨越。一般用于流程性比较强的程序,我们做的用户登录功能也是同步处理的,必须用户通过用户名和密码验证后才能进入系统的操作。
异步的概念
异步,是一种并行处理的方式,不必等待一个程序执行完,可以执行其它的任务。在程序中异步处理的结果通常使用回调函数来处理结果。在JavaScript中实现异步的方式主要有Ajax和H5新增的Web Work...
1、同步、异步有什么区别
在进行网络编程时,我们通常会看到同步、异步、阻塞、非阻塞四种调用方式以及他们的组合。
其中同步方式、异步方式主要是由客户端(client)控制的,具体如下:
同步(Sync)
所谓同步,就是发出一个功能调用时,在没有得到结果之前,该调用就不返回或继续执行后续操作。
根据这个定义,Java中所有方法都是同步调用,应为必须要等到结果后才会继续执行。我们在说同...
写在前面通过之前MQTT系列-Eclipse.Paho源码分析(二)-消息的发送与接收的介绍,相信仔细阅读过的小伙伴已经对Eclipse.Paho内部发送和订阅消息的流程有了一个较为清晰的认识,今天我们就把剩下的边角料扫一扫,也就是Eclipse.Paho作为客户端是如何进行容灾补偿和心跳的相关介绍。心跳机制首先了解一下在MQTT协议中心跳请求和响应是如何规定的。下面是官...
mqtt服务器退出时,直接在connlost回调函数中调用MQTTAsync_connect函数进行重新连接会出错。
经查询,paho-mqtt库提供了重连机制。要将automaticReconnect参数设置为1,且调用MQTTAsync_setConnect