添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

关注“云腾五洲”:获取二开ThingsBoard物联网平台演示

ThingsBoard 二次开发之源码分析 5-如何接收 MQTT 连接_thingsboard

TK物联网平台:​ ​ThingsKit物联网平台​

# ThingsBoard源码分析5-如何接收MQTT连接

## 1. MQTT server

需要接收设备的MQTT连接,那么thingsboard中必然有MQTT服务器,MQTT服务器创建的类是`MqttTransportService`;

基于netty的mqtt server,添加了`MqttTransportServerInitializer`的处理类,并向`ChannelPipeline`添加了netty的`MqttDecoder`和`MqttEncoder`让我们可以忽略MQTT消息的编解码工作,重要的是添加了`MqttTransportHandler`;

# 2. MqttTransportHandler处理连接

**此例中,我们首先需要创建租户,租户管理员,并添加设备,使用MQTT Box模拟硬件设备,拷贝ACCESS TOKEN做为MQTT Box的Username开始连接我们的thingsboard后台**

![mqtt消息处理流程](https://cdn.iotschool.com/photo/2020/00e26598-e91a-4a08-b557-18b204bec6c9.png?x-oss-process=image/resize,w_1920)

如果图片看不清楚,请点击:

- 标准:https://cdn.iotschool.com/photo/2020/00e26598-e91a-4a08-b557-18b204bec6c9.png?x-oss-process=image/resize,w_1920
- 高清:https://p.pstatp.com/origin/137b60001339a846253dd

由于没有使用ssl,收到连接请求以后,便会调用

```java
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
} else {
//取出userName,构造protobuf的类(方便传输与解析),交给transportService处理。此时会使用到源码解析第三篇DefaultTransportService的解析的相关信息了解process的处理。参阅下方①的详细解析。
transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
onValidateDeviceResponse(msg, ctx);
}

@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
ctx.close();
}
});
}
}
  1. ​DefaultTransportService​ ​的​ ​process​ ​方法构造了异步任务,成功调用​ ​onSuccess​ ​的​ ​Consumer​ ​,失败调用​ ​onFailure​ ​的​ ​Consumer​ ​;
  2. 将验证用户的任务交由​ ​transportApiRequestTemplate.send​
public ListenableFuture<Response> send(Request request) {
if (tickSize > maxPendingRequests) {
return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
}
UUID requestId = UUID.randomUUID();
request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
//由第三篇文章的分析得出,此topic时tb_transport.api.responses.localHostName
request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
//参阅第一篇基础知识的介绍,来自谷歌的库,settableFuture,可设置结果的完成
SettableFuture<Response> future = SettableFuture.create();
ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
//将future放到pendingRequests中②
pendingRequests .putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
//将消息发送给消息队列topic是tb_transport.api.requests
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.trace("[{}] Request sent: {}", requestId, metadata);
}

@Override
public void onFailure(Throwable t) {
pendingRequests.remove(requestId);
future.setException(t);
}
});
return future;
}
  1. 根据第三篇​ ​TbCoreTransportApiService​ ​​的分析,我们发现​ ​DefaultTbQueueResponseTemplate​ ​​的成员变量​ ​requestTemplate​ ​​即​ ​consumer​ ​刚好是订阅的tb_transport.api.requests的消息:
......
requests.forEach(request -> {
long currentTime = System.currentTimeMillis();
long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));
if (requestTime + requestTimeout >= currentTime) {
byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header", request);
return;
}
//获取response的topic,可以做到消息从哪来,处理好以后回哪里去,此时的topic是tb_transport.api.responses.localHostName
byte [] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);
if (responseTopicHeader == null) {
log.error("[{}] Missing response topic in header", request);
return;
}
UUID requestId = bytesToUuid(requestIdHeader);
String responseTopic = bytesToString(responseTopicHeader);
try {
pendingRequestCount.getAndIncrement();
//调用handler进行处理消息
AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
response -> {
pendingRequestCount.decrementAndGet();
response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
//handler.hande处理的结果返回给发送方topic是tb_transport.api.responses.localHostName
responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
},
e -> {
pendingRequestCount.decrementAndGet();
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
log.warn("[{}] Timeout to process the request: {}", requestId, request, e);
} else {
log.trace("[{}] Failed to process the request: {}", requestId, request, e);
}
},
requestTimeout,
timeoutExecutor,
callbackExecutor);
.......
  1. 具体验证逻辑:
@Override
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
// protobuf构造的类中判定是否包含需要验证的信息块
if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
//调用validateCredentials,具体内容就是查询deviceInfo,并将结果交由第二个Function进行进一步处理
return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
}
......
  1. 当通过设备的acess token找到了deviceInfo,便会通过消息中间件将DeviceInfo发出来,topic是 tb_transport.api.responses.localHostName ,在第三篇的分析中,​ ​DefaultTransportService​ ​​ 的​ ​transportApiRequestTemplate​ ​即订阅此topic:
List<Response> responses = responseTemplate.poll(pollInterval);
if (responses.size() > 0) {
log.trace("Polling responses completed, consumer records count [{}]", responses.size());
} else {
continue;
}
responses.forEach(response -> {
byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
UUID requestId;
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header and body", response);
} else {
requestId = bytesToUuid(requestIdHeader);
log.trace("[{}] Response received: {}" , requestId, response);
//参见上②,将验证的future放入到pendingRequests中,现在通过设置的requestId取出来
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
} else {
//设置settableFuture的结果
expectedResponse.future.set(response);
}
}
......
  1. ​DefaultTransportService​ ​​的​ ​process​ ​​异步请求获得了返回的结果,此时调用​ ​onSuccess​ ​​回调,即调用​ ​MqttTransportHandler​ ​​的​ ​onValidateDeviceResponse​ ​;
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
if (!msg.hasDeviceInfo()) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
} else {
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(context.getNodeId())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
.setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
.setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
.setDeviceName(msg.getDeviceInfo().getDeviceName())
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.build();
//创建SessionEvent.OPEN的消息,调用sendToDeviceActor方法,包含sessionInfo
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
.......
  1. sendToDeviceActor的实现:
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
//创建tpi,此时会选择一个固定的partition Id,组成的topic是tb_core, fullTopicName是tb_core.(int) 如: tb_core.1
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
......
//使用tbCoreMsgProducer发送到消息队列,设置了toDeviceActorMsg
tbCoreMsgProducer.send(tpi,
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
new TransportTbQueueCallback(callback) : null);
}
  1. 此时第二篇基于​ ​DefaultTbCoreConsumerService​ ​​可以知道​ ​DefaultTbCoreConsumerService​ ​ 的消费者订阅该主题的消息:
try {
ToCoreMsg toCoreMsg = msg.getValue();
if (toCoreMsg.hasToSubscriptionMgrMsg()) {
log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorMsg()) {
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg .getToDeviceActorMsg());
//交由此方法进行处理
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
}
  1. ​forwardToDeviceActor​ ​对消息的处理
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(toDeviceActorMsg);
}
//创建type为TRANSPORT_TO_DEVICE_ACTOR_MSG的消息,并交给AppActor处理
actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
}
  1. 通过第四篇的总结3,我们可以直接去看​ ​AppActor​ ​的​ ​doProcess​ ​方法对此类型消息的处理,跟踪发现​ ​AppActor​ ​将消息转给了​ ​TenantActor​ ​, ​ ​TenantActor​ ​创建了​ ​DeviceActor​ ​,并将消息转给了​ ​DeviceActor​ ​;
  2. DeviceActor拿到此类型的消息,进行了如下的处理:
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
//包装成TransportToDeviceActorMsgWrapper交由processor处理,并继续调用processSessionStateMsgs
processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
  1. ​processSessionStateMsgs​ ​的处理:
private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
UUID sessionId = getSessionId(sessionInfo);
if (msg.getEvent() == SessionEvent.OPEN) {
.....
sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
if (sessions .size() == 1) {
// 将调用pushRuleEngineMessage(stateData, CONNECT_EVENT);
reportSessionOpen();
}
//将调用pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
systemContext.getDeviceStateService().onDeviceActivity(deviceId, System.currentTimeMillis());
dumpSessions();
}
....
  1. 由于​ ​CONNECT_EVENT​ ​和​ ​ACTIVITY_EVENT​ ​仅仅类型不同,以下暂时只分析​ ​CONNECT_EVENT​
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
if (tenantId.isNullUid()) {
if (entityId.getEntityType().equals(EntityType.TENANT)) {
tenantId = new TenantId(entityId.getId());
} else {
log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);
return;
}
}
//和第7点类似,创建的tpi的fullTopicName的例子 tb_rule_engine.main.1
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg ), callback);
toRuleEngineMsgs.incrementAndGet();
}
  1. 通过第二篇的分析​ ​DefaultTbRuleEngineConsumerService​ ​订阅了此topic: tb_rule_engine.main.1的消息,收到消息以后,调用​ ​forwardToRuleEngineActor​ ​方法,包裹成​ ​QUEUE_TO_RULE_ENGINE_MSG​ ​类型的消息,交由AppActor进行分发处理;
  2. ​AppActor​ ​交给​ ​TenantActor​ ​处理,​ ​TenantActor​ ​交给​ ​RootRuleChain​ ​处理,​ ​RuleChainActor​ ​交给​ ​firstRuleNode​ ​处理,也就是某一个​ ​RuleNodeActor​ ​;
  3. 打开前端RULE CHAINS的界面,会发现,MESSAGE TYPE SWITCH是接收input的第一个节点,其实数据库的配置中, rule_chain 表中配置的 first_rule_node_id 就是​ ​TbMsgTypeSwitchNode​ ​;
  4. 进入​ ​TbMsgTypeSwitchNode​ ​的​ ​onMsg​ ​方法(实际上所有的ruleNode处理消息的方法都是​ ​onMsg​ ​),发现根据​ ​messageType​ ​(此时是​ ​CONNECT_EVENT​ ​)定义了relationtype并调用​ ​ctx.tellNext(msg, relationType)​ ​;
  5. 此时​ ​DefaultTbContext​ ​创建一个​ ​RuleNodeToRuleChainTellNextMsg​ ​,类型是​ ​RULE_TO_RULE_CHAIN_TELL_NEXT_MSG​ ​,交给​ ​RuleChainActor​ ​处理;
  6. 接下来将会进入到​ ​RuleChainActorMessageProcessor​ ​的​ ​onTellNext​ ​方法:
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
try {
checkActive(msg);
//消息来源
EntityId entityId = msg.getOriginator();
//创建一个tpi,可能会使用
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
//查询有关系的RuleNode,其实就是从relation表中查询,该消息来源的id,relation_type和在TbMsgTypeSwitchNode定义的relationType一直的节点id,如上Connect Event就没有找到相应的relation的RuleNodeId
List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
.filter(r -> contains(relationTypes, r.getType()))
.collect(Collectors.toList());
int relationsCount = relations.size();
//Connect Event就没有找到相应的relation的RuleNodeId,消息通过规则引擎,已经处理完成
if (relationsCount == 0) {
log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (relationTypes.contains(TbRelationTypes.FAILURE)) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
if (ruleNodeCtx != null) {
msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
} else {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
}
} else {
msg.getCallback().onSuccess();
}
//举例:Post telemetry的type可以找到相应的ruleNode,实现类是:TbMsgTimeseriesNode,那么此消息将会交给TbMsgTimeseriesNode处理
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
pushToTarget(tpi, msg, relation.getOut(), relation.getType());
}
} else {
MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);
for (RuleNodeRelation relation : relations) {
EntityId target = relation.getOut();
putToQueue(tpi, msg, callbackWrapper, target);
}
}
} catch (RuleNodeException rne) {
msg.getCallback().onFailure(rne);
} catch (Exception e) {
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
}
}

What's more:

如上面的举例,比如是遥测数据Post telemetry,将会使用​ ​TbMsgTimeseriesNode​ ​​的​ ​onMsg​ ​做进一步的处理,比如存储数据,再通过webSocket进行数据的更新如果有webSocket的session的话,或者其他通知消息,就不详细展开了。

总结:

  1. 处理MQTT的连接其实就是走完了整个规则引擎的逻辑,其他类型的消息,比如遥测数据,属性更新,RPC请求发送与接收,大体流程大同小异;
  2. 在处理消息流向的时候,我们一定要清楚其订阅或者发布的主题是什么,这样我们才不会丢失方向;
  3. Actor的模型就是根据消息的类型,使用AppActor进行一步步的分发,最终交由合适的RuleNode进行处理;
  4. Protobuf类型的消息容易序列化传输与解析,所以在thingsboard中大量使用,但是生成的类可读性不是很高,可以选择直接读queue.proto文件,对类有感性的认知。
    由于作者水平有限,只是梳理了大致的流程,文章难免出现纰漏,望谅解并指正。




一个java项目架构怎么搭 java项目框架图

热门系列:【Java基础巩固系列】Java数据集合,List、Map、Set、JUC,应有尽有【Java基础巩固系列】Java内存溢出和内存泄漏 【Java基础巩固系列】Java类初始化执行顺序 【Java基础巩固系列】Java双亲委派机制理解   程序人生,精彩抢先看 1.前言学习如逆水行舟,不进则退!生活和工作,很多地方我们也需要不断的学习和努力。不少Java同行,可能和之