添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

I'm configuring Spring Integration to use cleanSession=false on one of my channels.

<bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        <property name="cleanSession" value="false" />
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttLiveDataInbound"
    client-id="client1"
    url="${mqtt.broker.url}"
    qos="1"
    topics="liveData"
    client-factory="clientFactory"
    channel="channelLiveData"/> 

Reason being I want to be able to receive message while my application is offline. When my application restarts, I want it to receive whatever QoS>0 message that were published during my absence.

Now I noticed something weird : my application doesn't pick up the missing QoS>0 messages after a downtime.

I've logged a simple scenario where

  • Spring Integration starts
  • A QoS1 message is sent to the topic and received by Spring Integration
  • Spring Integration exits
  • A QoS1 message is sent to the topic
  • Spring Integration starts
  • Spring Integration does not receive the QoS1 message that was sent while it was offline.
  • The reason being the following (as can be seen from the logs below) :

  • spring integration exits
  • it unsubscribes from the topic
  • it disconnects the client
  • This is essentially telling the broker that this client is no longer interested in these messages. When my app is down, the broker is no longer persisting these QoS>0 messages for me.

    When my app starts up again, it fails to receive the QoS>0 messages that were published while it was down.

    1448917620: New connection from 127.0.0.1 on port 1883.
    1448917620: New client connected from 127.0.0.1 as client1 (c0, k60).
    1448917620: Sending CONNACK to client1 (0, 0)
    1448917620: Received SUBSCRIBE from client1
    1448917620:     liveData (QoS 1)
    1448917620: Sending SUBACK to client1
    1448917632: New connection from ::1 on port 1883.
    1448917632: New client connected from ::1 as mosqpub/25936-MacBook-P (c1, k60, u'system').
    1448917632: Sending CONNACK to mosqpub/25936-MacBook-P (0, 0)
    1448917632: Received PUBLISH from mosqpub/25936-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
    1448917632: Sending PUBACK to mosqpub/25936-MacBook-P (Mid: 1)
    1448917632: Sending PUBLISH to client1 (d0, q1, r0, m1, 'liveData', ... (68 bytes))
    1448917632: Received DISCONNECT from mosqpub/25936-MacBook-P
    1448917632: Client mosqpub/25936-MacBook-P disconnected.
    1448917633: Received PUBACK from client1 (Mid: 1)
    1448917643: Received UNSUBSCRIBE from client1
    1448917643:     liveData
    1448917643: Received DISCONNECT from client1
    1448917643: Client client1 disconnected.
    1448917648: New connection from ::1 on port 1883.
    1448917648: New client connected from ::1 as mosqpub/25945-MacBook-P (c1, k60, u'system').
    1448917648: Sending CONNACK to mosqpub/25945-MacBook-P (0, 0)
    1448917648: Received PUBLISH from mosqpub/25945-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
    1448917648: Sending PUBACK to mosqpub/25945-MacBook-P (Mid: 1)
    1448917648: Received DISCONNECT from mosqpub/25945-MacBook-P
    1448917648: Client mosqpub/25945-MacBook-P disconnected.
    1448917665: New connection from 127.0.0.1 on port 1883.
    1448917665: Client client1 disconnected.
    1448917665: New client connected from 127.0.0.1 as client1 (c0, k60).
    1448917665: Sending CONNACK to client1 (1, 0)
    1448917665: Received SUBSCRIBE from client1
    1448917665:     liveData (QoS 1)
    1448917665: Sending SUBACK to client1
    

    I ran this scenario using the mosquitto client tools, and there exiting the mosquitto subscriber disconnects the clients but does not unsubscribe from the topic

    1448917534: New connection from ::1 on port 1883.
    1448917534: New client connected from ::1 as client1 (c0, k60).
    1448917534: Sending CONNACK to client1 (0, 0)
    1448917534: Received SUBSCRIBE from client1
    1448917534:     liveData (QoS 1)
    1448917534: Sending SUBACK to client1
    1448917550: New connection from ::1 on port 1883.
    1448917550: New client connected from ::1 as mosqpub/25879-MacBook-P (c1, k60, u'system').
    1448917550: Sending CONNACK to mosqpub/25879-MacBook-P (0, 0)
    1448917550: Received PUBLISH from mosqpub/25879-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
    1448917550: Sending PUBACK to mosqpub/25879-MacBook-P (Mid: 1)
    1448917550: Sending PUBLISH to client1 (d0, q1, r0, m1, 'liveData', ... (68 bytes))
    1448917550: Received DISCONNECT from mosqpub/25879-MacBook-P
    1448917550: Client mosqpub/25879-MacBook-P disconnected.
    1448917550: Received PUBACK from client1 (Mid: 1)
    1448917553: Socket error on client client1, disconnecting.
    1448917554: New connection from ::1 on port 1883.
    1448917554: New client connected from ::1 as mosqpub/25884-MacBook-P (c1, k60, u'system').
    1448917554: Sending CONNACK to mosqpub/25884-MacBook-P (0, 0)
    1448917554: Received PUBLISH from mosqpub/25884-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
    1448917554: Sending PUBACK to mosqpub/25884-MacBook-P (Mid: 1)
    1448917554: Received DISCONNECT from mosqpub/25884-MacBook-P
    1448917555: Client mosqpub/25884-MacBook-P disconnected.
    1448917556: New connection from ::1 on port 1883.
    1448917556: Client client1 disconnected.
    1448917556: New client connected from ::1 as client1 (c0, k60).
    1448917556: Sending CONNACK to client1 (0, 0)
    1448917556: Sending PUBLISH to client1 (d0, q1, r0, m2, 'liveData', ... (68 bytes))
    1448917556: Received SUBSCRIBE from client1
    1448917556:     liveData (QoS 1)
    1448917556: Sending SUBACK to client1
    1448917556: Received PUBACK from client1 (Mid: 2)
    

    Any idea how to deal with this situation ?

    EDIT :

    When implementing the workaround as proposed in the accepted answer, I'm getting the following error. My Spring context is loaded from a webapp. I've tried putting the IgnoreUnsubscribePahoClientFactory in a seperate JAR (same level as spring-integration / paho) as well as in the webapp classes itself.

    2015-12-02 15:47:43,703 ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.aop.framework.AopConfigException: Could not generate CGLIB subclass of class [class org.eclipse.paho.client.mqttv3.MqttAsyncClient]: Common causes of this problem include using a final class or a non-visible class; nested exception is org.springframework.cglib.core.CodeGenerationException: java.lang.reflect.InvocationTargetException-->null
            at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:206)
            at org.springframework.aop.framework.ProxyFactoryBean.getProxy(ProxyFactoryBean.java:368)
            at org.springframework.aop.framework.ProxyFactoryBean.getSingletonInstance(ProxyFactoryBean.java:322)
            at org.springframework.aop.framework.ProxyFactoryBean.getObject(ProxyFactoryBean.java:246)
            at com.ecs.vdm.rest.integration.IgnoreUnsubscribePahoClientFactory.proxy(IgnoreUnsubscribePahoClientFactory.java:62)
            at com.ecs.vdm.rest.integration.IgnoreUnsubscribePahoClientFactory.getAsyncClientInstance(IgnoreUnsubscribePahoClientFactory.java:43)
            at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe(MqttPahoMessageDrivenChannelAdapter.java:216)
            at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.access$300(MqttPahoMessageDrivenChannelAdapter.java:45)
            at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter$1.run(MqttPahoMessageDrivenChannelAdapter.java:272)
            at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: org.springframework.cglib.core.CodeGenerationException: java.lang.reflect.InvocationTargetException-->null
            at org.springframework.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:237)
            at org.springframework.cglib.proxy.Enhancer.createHelper(Enhancer.java:377)
            at org.springframework.cglib.proxy.Enhancer.createClass(Enhancer.java:317)
            at org.springframework.aop.framework.ObjenesisCglibAopProxy.createProxyClassAndInstance(ObjenesisCglibAopProxy.java:57)
            at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:202)
            ... 16 more
    Caused by: java.lang.reflect.InvocationTargetException
            at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:606)
            at org.springframework.cglib.core.ReflectUtils.defineClass(ReflectUtils.java:384)
            at org.springframework.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:219)
            ... 20 more
    Caused by: java.lang.SecurityException: class "org.eclipse.paho.client.mqttv3.MqttAsyncClient$$EnhancerBySpringCGLIB$$d14754a9_4603"'s signer information does not match signer information of other classes in the same package
            at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
            at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
            at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
            ... 25 more
    

    It's a bug - it unconditionally unsubscribes during the stop().

    I don't see a simple work around; but I have a couple of ideas; I'll post here if/when I have something.

    In the meantime, please open a JIRA Issue.

    Gist Here

    It's a bit of a sledgehammer, but it should work for you; it effectively ignores the call to unsubscribe on the client. It could be made a little more sophisticated, to only ignore when the QOS is > 0, but that would be quite a bit more involved.

    If you're already using the DefaultMqttPahoClientFactory just change the bean class to this one. If you're not currently using a factory, declare it as a bean and provide it to the adapter using the client-factory attribute.

    We'll fix it properly in an upcoming release.

    Thx for the reply .... Created jira.spring.io/browse/INT-3900 (currently set to minor but can't edit it once created it seems). Feel free to up the prio :) – ddewaele Dec 1, 2015 at 4:50 Can I work around the cglib security issues with this gist ? (signer information does not match signer information of other classes in the same package) or do i need to build my own version of spring-integration ? – ddewaele Dec 1, 2015 at 7:35 Sorry; no; I tested it there, but it doesn't have to be in that package - just put it in a package in your app. – Gary Russell Dec 1, 2015 at 14:06 Strange - it works fine for me - can you show the complete stack trace? (edit the question, don't try to paste it in a comment). 10:14:52.105 WARN [main][com.foo.IgnoreUnsubscribePahoClientFactory] Skipping unsubscribe. – Gary Russell Dec 1, 2015 at 15:12 Interesting; there's a similar issue discussed here which seems to be related to having a jar on the classpath twice. Perhaps the paho jar? If you can't get to the bottom of it, I have an open PR for the fix. It should be merged early next week when my reviewer returns to work. We can build the 4.2.3 release soon thereafter. In the meantime, if you want to test with it, you can pull down my branch and build it locally. – Gary Russell Dec 3, 2015 at 17:42

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.