添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am trying to build an application which subscribes to multiple mqtt topics, get the information, process it and form xmls and upon processing trigger an event so that these can be sent to some cloud server and the successful response from there to be sent back to the mqtt channel.

<int-mqtt:message-driven-channel-adapter
    id="mqttAdapter" client-id="${clientId}" url="${brokerUrl}" topics="${topics}"
    channel="startCase" auto-startup="true" />
<int:channel id="startCase" />
<int:service-activator id="startCaseService"
        input-channel="startCase" ref="msgPollingService" method="pollMessages" />
    <bean id="mqttTaskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="5" />
        <property name="maxPoolSize" value="10" />
    </bean>
    <bean id="msgPollingService" class="com.xxxx.xxx.mqttclient.mqtt.MsgPollingService">
        <property name="taskExecutor" ref="mqttTaskExecutor" />
        <property name="vendorId" value="${vendorId}" />
    </bean>

My question is how do I publish this to multiple channels, i.e. if I have an option to publish X message to Y topic. At present I have the below:

<int:channel id="outbound" />
<int-mqtt:outbound-channel-adapter
    id="mqtt-publish" client-id="kj" client-factory="clientFactory"
    auto-startup="true" url="${brokerUrl}" default-qos="0"
    default-retained="true" default-topic="${responseTopic}" channel="outbound" />
    <bean id="eventListner" class="com.xxxx.xxxx.mqttclient.event.EventListener">
        <property name="sccUrl" value="${url}" />
        <property name="restTemplate" ref="restTemplate" />
        <property name="channel" ref="outbound" />
    </bean>

I can publish this like:

channel.send(MessageBuilder.withPayload("customResponse").build());

Can I do something like:

channel.send(Message<?>, topic)

Your configuration looks good. However the MessageChannel is an abstraction for loosely-coupling and gets deal only with Message.

So, you request a-la channel.send(Message<?>, topic) isn't correct for Messaging concepts.

However we have a trick for you. From AbstractMqttMessageHandler:

String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC);
.....
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message);

So, what you need from your code is this:

channel.send(MessageBuilder.withPayload("customResponse").setHeader(MqttHeaders.TOPIC, topic).build());

In other words you should send a Message with mqtt_topic header to achieve a dynamic publication from <int-mqtt:outbound-channel-adapter>.

From other side we don't recommend to use MessageChannels directly from the application. The <gateway> with service interface is for such a case for end-application. Where that topic can be one of service method argument marked as @Header(MqttHeaders.TOPIC)

Thanks, yes, gateway is what I'll implements as it seems more appropriate. Also, I may sound naive as I have just started implementing messaging systems, am I correct in defining separate channels for inbound and outbound? – Sandeep B Feb 4, 2015 at 9:06 Yes, that's true. If you aren't going to shift messages from one topic and send them to another you definitely need several channels. – Artem Bilan Feb 4, 2015 at 9:09

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.