RabbitTemplate extending DirectReplyToMessageListenerContainer











up vote
0
down vote

favorite












I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer, but RabbitTemplate seems to instantiate DirectReplyToMessageListenerContainer directly.



Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to:



org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
... 10 common frames omitted


Is there a different way (if any) to receive multiple responses when using sendAndReceive methods?










share|improve this question


























    up vote
    0
    down vote

    favorite












    I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer, but RabbitTemplate seems to instantiate DirectReplyToMessageListenerContainer directly.



    Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to:



    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
    Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    ... 10 common frames omitted


    Is there a different way (if any) to receive multiple responses when using sendAndReceive methods?










    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer, but RabbitTemplate seems to instantiate DirectReplyToMessageListenerContainer directly.



      Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to:



      org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
      at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
      at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
      Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
      at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      ... 10 common frames omitted


      Is there a different way (if any) to receive multiple responses when using sendAndReceive methods?










      share|improve this question













      I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer, but RabbitTemplate seems to instantiate DirectReplyToMessageListenerContainer directly.



      Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to:



      org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
      at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
      at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
      Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
      at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
      ... 10 common frames omitted


      Is there a different way (if any) to receive multiple responses when using sendAndReceive methods?







      java spring spring-amqp






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 8 at 10:41









      alturkovic

      527514




      527514
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          1
          down vote



          accepted










          It is not designed to do that; you would need to use a RabbitTemplate.send() operation and a stand-alone listener container and you would correlate the replies in your code.



          EDIT



          Here's one way to achieve it (as long as you know how many replies to expect)...



          @SpringBootApplication
          public class So53206036Application {

          public static void main(String args) {
          SpringApplication.run(So53206036Application.class, args);
          }

          @Bean
          public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
          MultiReplyTemplate template = new MultiReplyTemplate();
          template.setConnectionFactory(cf);
          template.setMessageConverter(listConverter());
          return template;
          }

          @Bean
          public ListConverter listConverter() {
          return new ListConverter(new SimpleMessageConverter());
          }

          @RabbitListener(queues = "foo")
          public String listen1(String in) {
          return in.toUpperCase();
          }

          @RabbitListener(queues = "bar")
          public String listen2(String in) {
          return in + in;
          }

          @Bean
          public ApplicationRunner runner(MultiReplyTemplate template) {
          return args -> {
          List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
          new ParameterizedTypeReference<List<String>>() { });
          System.out.println(reply);
          };
          }

          }

          class MultiReplyTemplate extends RabbitTemplate {

          private static final byte NOBODY = new byte[0];

          private final Map<String, Message> replies = new HashMap<>();

          @Override
          public void onMessage(Message message) {
          // Not thread-safe but that's ok since the DRTMLC is single-threadded.
          String corr = message.getMessageProperties().getCorrelationId();
          Message combined = this.replies.get(corr);
          if (combined == null) {
          combined = new Message(NOBODY, new MessageProperties());
          combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
          this.replies.put(corr, combined);
          }
          @SuppressWarnings("unchecked")
          List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
          list.add(message);
          if (list.size() == 2) {
          this.replies.remove(corr);
          combined.getMessageProperties().setCorrelationId(corr);
          super.onMessage(combined);
          }
          }

          }

          class ListConverter implements SmartMessageConverter {

          private final MessageConverter delegate;

          ListConverter(MessageConverter delegate) {
          this.delegate = delegate;
          }

          @Override
          public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
          return this.delegate.toMessage(object, messageProperties);
          }

          @Override
          public Object fromMessage(Message message) throws MessageConversionException {
          return this.delegate.fromMessage(message); // for listeners
          }

          @Override
          public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
          @SuppressWarnings({ "unchecked" })
          List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
          return list.stream()
          .map(m -> this.delegate.fromMessage(m))
          .collect(Collectors.toList());
          }

          }


          and



          [FOO, foofoo]





          share|improve this answer























          • I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize DirectReplyToMessageListenerContainer that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
            – alturkovic
            Nov 8 at 14:35






          • 1




            It's not the container that doesn't support multiple replies, it's the template's onMessage() method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage() method; accumulate the replies then synthesize a single Message from the replies and call super.onMessage().
            – Gary Russell
            Nov 8 at 14:55










          • I know in advance how many messages I intend to receive and would pass that as a parameter, i.e. expectedReplies and a timeout. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate, thanks!
            – alturkovic
            Nov 8 at 15:02










          • Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>) then use a custom MessageConverter to create your actual reply for the sendAndReceive method.
            – Gary Russell
            Nov 8 at 15:39










          • Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
            – alturkovic
            Nov 8 at 15:43











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














           

          draft saved


          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53206036%2frabbittemplate-extending-directreplytomessagelistenercontainer%23new-answer', 'question_page');
          }
          );

          Post as a guest
































          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          1
          down vote



          accepted










          It is not designed to do that; you would need to use a RabbitTemplate.send() operation and a stand-alone listener container and you would correlate the replies in your code.



          EDIT



          Here's one way to achieve it (as long as you know how many replies to expect)...



          @SpringBootApplication
          public class So53206036Application {

          public static void main(String args) {
          SpringApplication.run(So53206036Application.class, args);
          }

          @Bean
          public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
          MultiReplyTemplate template = new MultiReplyTemplate();
          template.setConnectionFactory(cf);
          template.setMessageConverter(listConverter());
          return template;
          }

          @Bean
          public ListConverter listConverter() {
          return new ListConverter(new SimpleMessageConverter());
          }

          @RabbitListener(queues = "foo")
          public String listen1(String in) {
          return in.toUpperCase();
          }

          @RabbitListener(queues = "bar")
          public String listen2(String in) {
          return in + in;
          }

          @Bean
          public ApplicationRunner runner(MultiReplyTemplate template) {
          return args -> {
          List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
          new ParameterizedTypeReference<List<String>>() { });
          System.out.println(reply);
          };
          }

          }

          class MultiReplyTemplate extends RabbitTemplate {

          private static final byte NOBODY = new byte[0];

          private final Map<String, Message> replies = new HashMap<>();

          @Override
          public void onMessage(Message message) {
          // Not thread-safe but that's ok since the DRTMLC is single-threadded.
          String corr = message.getMessageProperties().getCorrelationId();
          Message combined = this.replies.get(corr);
          if (combined == null) {
          combined = new Message(NOBODY, new MessageProperties());
          combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
          this.replies.put(corr, combined);
          }
          @SuppressWarnings("unchecked")
          List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
          list.add(message);
          if (list.size() == 2) {
          this.replies.remove(corr);
          combined.getMessageProperties().setCorrelationId(corr);
          super.onMessage(combined);
          }
          }

          }

          class ListConverter implements SmartMessageConverter {

          private final MessageConverter delegate;

          ListConverter(MessageConverter delegate) {
          this.delegate = delegate;
          }

          @Override
          public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
          return this.delegate.toMessage(object, messageProperties);
          }

          @Override
          public Object fromMessage(Message message) throws MessageConversionException {
          return this.delegate.fromMessage(message); // for listeners
          }

          @Override
          public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
          @SuppressWarnings({ "unchecked" })
          List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
          return list.stream()
          .map(m -> this.delegate.fromMessage(m))
          .collect(Collectors.toList());
          }

          }


          and



          [FOO, foofoo]





          share|improve this answer























          • I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize DirectReplyToMessageListenerContainer that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
            – alturkovic
            Nov 8 at 14:35






          • 1




            It's not the container that doesn't support multiple replies, it's the template's onMessage() method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage() method; accumulate the replies then synthesize a single Message from the replies and call super.onMessage().
            – Gary Russell
            Nov 8 at 14:55










          • I know in advance how many messages I intend to receive and would pass that as a parameter, i.e. expectedReplies and a timeout. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate, thanks!
            – alturkovic
            Nov 8 at 15:02










          • Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>) then use a custom MessageConverter to create your actual reply for the sendAndReceive method.
            – Gary Russell
            Nov 8 at 15:39










          • Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
            – alturkovic
            Nov 8 at 15:43















          up vote
          1
          down vote



          accepted










          It is not designed to do that; you would need to use a RabbitTemplate.send() operation and a stand-alone listener container and you would correlate the replies in your code.



          EDIT



          Here's one way to achieve it (as long as you know how many replies to expect)...



          @SpringBootApplication
          public class So53206036Application {

          public static void main(String args) {
          SpringApplication.run(So53206036Application.class, args);
          }

          @Bean
          public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
          MultiReplyTemplate template = new MultiReplyTemplate();
          template.setConnectionFactory(cf);
          template.setMessageConverter(listConverter());
          return template;
          }

          @Bean
          public ListConverter listConverter() {
          return new ListConverter(new SimpleMessageConverter());
          }

          @RabbitListener(queues = "foo")
          public String listen1(String in) {
          return in.toUpperCase();
          }

          @RabbitListener(queues = "bar")
          public String listen2(String in) {
          return in + in;
          }

          @Bean
          public ApplicationRunner runner(MultiReplyTemplate template) {
          return args -> {
          List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
          new ParameterizedTypeReference<List<String>>() { });
          System.out.println(reply);
          };
          }

          }

          class MultiReplyTemplate extends RabbitTemplate {

          private static final byte NOBODY = new byte[0];

          private final Map<String, Message> replies = new HashMap<>();

          @Override
          public void onMessage(Message message) {
          // Not thread-safe but that's ok since the DRTMLC is single-threadded.
          String corr = message.getMessageProperties().getCorrelationId();
          Message combined = this.replies.get(corr);
          if (combined == null) {
          combined = new Message(NOBODY, new MessageProperties());
          combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
          this.replies.put(corr, combined);
          }
          @SuppressWarnings("unchecked")
          List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
          list.add(message);
          if (list.size() == 2) {
          this.replies.remove(corr);
          combined.getMessageProperties().setCorrelationId(corr);
          super.onMessage(combined);
          }
          }

          }

          class ListConverter implements SmartMessageConverter {

          private final MessageConverter delegate;

          ListConverter(MessageConverter delegate) {
          this.delegate = delegate;
          }

          @Override
          public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
          return this.delegate.toMessage(object, messageProperties);
          }

          @Override
          public Object fromMessage(Message message) throws MessageConversionException {
          return this.delegate.fromMessage(message); // for listeners
          }

          @Override
          public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
          @SuppressWarnings({ "unchecked" })
          List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
          return list.stream()
          .map(m -> this.delegate.fromMessage(m))
          .collect(Collectors.toList());
          }

          }


          and



          [FOO, foofoo]





          share|improve this answer























          • I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize DirectReplyToMessageListenerContainer that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
            – alturkovic
            Nov 8 at 14:35






          • 1




            It's not the container that doesn't support multiple replies, it's the template's onMessage() method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage() method; accumulate the replies then synthesize a single Message from the replies and call super.onMessage().
            – Gary Russell
            Nov 8 at 14:55










          • I know in advance how many messages I intend to receive and would pass that as a parameter, i.e. expectedReplies and a timeout. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate, thanks!
            – alturkovic
            Nov 8 at 15:02










          • Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>) then use a custom MessageConverter to create your actual reply for the sendAndReceive method.
            – Gary Russell
            Nov 8 at 15:39










          • Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
            – alturkovic
            Nov 8 at 15:43













          up vote
          1
          down vote



          accepted







          up vote
          1
          down vote



          accepted






          It is not designed to do that; you would need to use a RabbitTemplate.send() operation and a stand-alone listener container and you would correlate the replies in your code.



          EDIT



          Here's one way to achieve it (as long as you know how many replies to expect)...



          @SpringBootApplication
          public class So53206036Application {

          public static void main(String args) {
          SpringApplication.run(So53206036Application.class, args);
          }

          @Bean
          public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
          MultiReplyTemplate template = new MultiReplyTemplate();
          template.setConnectionFactory(cf);
          template.setMessageConverter(listConverter());
          return template;
          }

          @Bean
          public ListConverter listConverter() {
          return new ListConverter(new SimpleMessageConverter());
          }

          @RabbitListener(queues = "foo")
          public String listen1(String in) {
          return in.toUpperCase();
          }

          @RabbitListener(queues = "bar")
          public String listen2(String in) {
          return in + in;
          }

          @Bean
          public ApplicationRunner runner(MultiReplyTemplate template) {
          return args -> {
          List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
          new ParameterizedTypeReference<List<String>>() { });
          System.out.println(reply);
          };
          }

          }

          class MultiReplyTemplate extends RabbitTemplate {

          private static final byte NOBODY = new byte[0];

          private final Map<String, Message> replies = new HashMap<>();

          @Override
          public void onMessage(Message message) {
          // Not thread-safe but that's ok since the DRTMLC is single-threadded.
          String corr = message.getMessageProperties().getCorrelationId();
          Message combined = this.replies.get(corr);
          if (combined == null) {
          combined = new Message(NOBODY, new MessageProperties());
          combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
          this.replies.put(corr, combined);
          }
          @SuppressWarnings("unchecked")
          List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
          list.add(message);
          if (list.size() == 2) {
          this.replies.remove(corr);
          combined.getMessageProperties().setCorrelationId(corr);
          super.onMessage(combined);
          }
          }

          }

          class ListConverter implements SmartMessageConverter {

          private final MessageConverter delegate;

          ListConverter(MessageConverter delegate) {
          this.delegate = delegate;
          }

          @Override
          public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
          return this.delegate.toMessage(object, messageProperties);
          }

          @Override
          public Object fromMessage(Message message) throws MessageConversionException {
          return this.delegate.fromMessage(message); // for listeners
          }

          @Override
          public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
          @SuppressWarnings({ "unchecked" })
          List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
          return list.stream()
          .map(m -> this.delegate.fromMessage(m))
          .collect(Collectors.toList());
          }

          }


          and



          [FOO, foofoo]





          share|improve this answer














          It is not designed to do that; you would need to use a RabbitTemplate.send() operation and a stand-alone listener container and you would correlate the replies in your code.



          EDIT



          Here's one way to achieve it (as long as you know how many replies to expect)...



          @SpringBootApplication
          public class So53206036Application {

          public static void main(String args) {
          SpringApplication.run(So53206036Application.class, args);
          }

          @Bean
          public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
          MultiReplyTemplate template = new MultiReplyTemplate();
          template.setConnectionFactory(cf);
          template.setMessageConverter(listConverter());
          return template;
          }

          @Bean
          public ListConverter listConverter() {
          return new ListConverter(new SimpleMessageConverter());
          }

          @RabbitListener(queues = "foo")
          public String listen1(String in) {
          return in.toUpperCase();
          }

          @RabbitListener(queues = "bar")
          public String listen2(String in) {
          return in + in;
          }

          @Bean
          public ApplicationRunner runner(MultiReplyTemplate template) {
          return args -> {
          List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
          new ParameterizedTypeReference<List<String>>() { });
          System.out.println(reply);
          };
          }

          }

          class MultiReplyTemplate extends RabbitTemplate {

          private static final byte NOBODY = new byte[0];

          private final Map<String, Message> replies = new HashMap<>();

          @Override
          public void onMessage(Message message) {
          // Not thread-safe but that's ok since the DRTMLC is single-threadded.
          String corr = message.getMessageProperties().getCorrelationId();
          Message combined = this.replies.get(corr);
          if (combined == null) {
          combined = new Message(NOBODY, new MessageProperties());
          combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
          this.replies.put(corr, combined);
          }
          @SuppressWarnings("unchecked")
          List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
          list.add(message);
          if (list.size() == 2) {
          this.replies.remove(corr);
          combined.getMessageProperties().setCorrelationId(corr);
          super.onMessage(combined);
          }
          }

          }

          class ListConverter implements SmartMessageConverter {

          private final MessageConverter delegate;

          ListConverter(MessageConverter delegate) {
          this.delegate = delegate;
          }

          @Override
          public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
          return this.delegate.toMessage(object, messageProperties);
          }

          @Override
          public Object fromMessage(Message message) throws MessageConversionException {
          return this.delegate.fromMessage(message); // for listeners
          }

          @Override
          public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
          @SuppressWarnings({ "unchecked" })
          List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
          return list.stream()
          .map(m -> this.delegate.fromMessage(m))
          .collect(Collectors.toList());
          }

          }


          and



          [FOO, foofoo]






          share|improve this answer














          share|improve this answer



          share|improve this answer








          edited Nov 8 at 16:35

























          answered Nov 8 at 14:26









          Gary Russell

          76.2k64166




          76.2k64166












          • I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize DirectReplyToMessageListenerContainer that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
            – alturkovic
            Nov 8 at 14:35






          • 1




            It's not the container that doesn't support multiple replies, it's the template's onMessage() method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage() method; accumulate the replies then synthesize a single Message from the replies and call super.onMessage().
            – Gary Russell
            Nov 8 at 14:55










          • I know in advance how many messages I intend to receive and would pass that as a parameter, i.e. expectedReplies and a timeout. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate, thanks!
            – alturkovic
            Nov 8 at 15:02










          • Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>) then use a custom MessageConverter to create your actual reply for the sendAndReceive method.
            – Gary Russell
            Nov 8 at 15:39










          • Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
            – alturkovic
            Nov 8 at 15:43


















          • I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize DirectReplyToMessageListenerContainer that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
            – alturkovic
            Nov 8 at 14:35






          • 1




            It's not the container that doesn't support multiple replies, it's the template's onMessage() method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage() method; accumulate the replies then synthesize a single Message from the replies and call super.onMessage().
            – Gary Russell
            Nov 8 at 14:55










          • I know in advance how many messages I intend to receive and would pass that as a parameter, i.e. expectedReplies and a timeout. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate, thanks!
            – alturkovic
            Nov 8 at 15:02










          • Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>) then use a custom MessageConverter to create your actual reply for the sendAndReceive method.
            – Gary Russell
            Nov 8 at 15:39










          • Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
            – alturkovic
            Nov 8 at 15:43
















          I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize DirectReplyToMessageListenerContainer that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
          – alturkovic
          Nov 8 at 14:35




          I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize DirectReplyToMessageListenerContainer that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
          – alturkovic
          Nov 8 at 14:35




          1




          1




          It's not the container that doesn't support multiple replies, it's the template's onMessage() method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage() method; accumulate the replies then synthesize a single Message from the replies and call super.onMessage().
          – Gary Russell
          Nov 8 at 14:55




          It's not the container that doesn't support multiple replies, it's the template's onMessage() method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage() method; accumulate the replies then synthesize a single Message from the replies and call super.onMessage().
          – Gary Russell
          Nov 8 at 14:55












          I know in advance how many messages I intend to receive and would pass that as a parameter, i.e. expectedReplies and a timeout. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate, thanks!
          – alturkovic
          Nov 8 at 15:02




          I know in advance how many messages I intend to receive and would pass that as a parameter, i.e. expectedReplies and a timeout. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate, thanks!
          – alturkovic
          Nov 8 at 15:02












          Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>) then use a custom MessageConverter to create your actual reply for the sendAndReceive method.
          – Gary Russell
          Nov 8 at 15:39




          Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>) then use a custom MessageConverter to create your actual reply for the sendAndReceive method.
          – Gary Russell
          Nov 8 at 15:39












          Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
          – alturkovic
          Nov 8 at 15:43




          Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
          – alturkovic
          Nov 8 at 15:43


















           

          draft saved


          draft discarded



















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53206036%2frabbittemplate-extending-directreplytomessagelistenercontainer%23new-answer', 'question_page');
          }
          );

          Post as a guest




















































































          Popular posts from this blog

          Schultheiß

          Verwaltungsgliederung Dänemarks

          Liste der Kulturdenkmale in Wilsdruff