添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I am using Spring's spring-integration-mqtt and i can connect to a single Mqtt server and can receive messages on subscribed topics , and now i want to make application which can connect to multiple Mqtt Servers and can receive data from every connection and i want to manage it as dynamic where i can add more Mqtt servers from database or text file.

a simple bean for single Mqtt connection for subscription is as follow

@Bean
public MessageProducer inbound() {
    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);
    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

above code creates a connection for the mqtt server and can receive messages and if i copy paste the same code twice for second server with different Mqtt ip address i can connect to both Mqtt Server as follows

@Bean
public MessageProducer inbound() {
    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);
    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;
@Bean
public MessageProducer inbound2() {
    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);
    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

above code also works fine and i can receive message from both Mqtt Servers, but is there any way i can manage it dynamically like as follows, i change the bean's return type to list, but didn't worked:

  @Bean
  public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
      List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
      MqttPahoMessageDrivenChannelAdapter adapter2 =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter2.setCompletionTimeout(0);
      adapter2.setConverter(new DefaultPahoMessageConverter());
      adapter2.setQos(2);
      adapter2.setOutputChannel(mqttInputChannel() );
      MqttPahoMessageDrivenChannelAdapter adapter =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter.setCompletionTimeout(0);
      adapter.setConverter(new DefaultPahoMessageConverter());
      adapter.setQos(2);
      adapter.setOutputChannel(mqttInputChannel() );
      logConfList.add(adapter);
      logConfList.add(adapter2);
      return logConfList;

is there any way i can manage these beans dynamically, where i can fetch mqtt server details from text file and in a for loop or something i can manage multiple connections.

private IntegrationFlowContext flowContext; private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel, String... topics) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics); // more adapter configuration IntegrationFlow flow = IntegrationFlows.from(adapter) .channel(channel) .get(); return this.flowContext.registration(flow).register(); private void removeAdapter(IntegrationFlowRegistration flowReg) { this.flowContext.remove(flowReg.getId());

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.