RabbitTemplate extending DirectReplyToMessageListenerContainer
up vote
0
down vote
favorite
I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer
, but RabbitTemplate
seems to instantiate DirectReplyToMessageListenerContainer
directly.
Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to
:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
... 10 common frames omitted
Is there a different way (if any) to receive multiple responses when using sendAndReceive
methods?
java spring spring-amqp
add a comment |
up vote
0
down vote
favorite
I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer
, but RabbitTemplate
seems to instantiate DirectReplyToMessageListenerContainer
directly.
Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to
:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
... 10 common frames omitted
Is there a different way (if any) to receive multiple responses when using sendAndReceive
methods?
java spring spring-amqp
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer
, but RabbitTemplate
seems to instantiate DirectReplyToMessageListenerContainer
directly.
Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to
:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
... 10 common frames omitted
Is there a different way (if any) to receive multiple responses when using sendAndReceive
methods?
java spring spring-amqp
I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer
, but RabbitTemplate
seems to instantiate DirectReplyToMessageListenerContainer
directly.
Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to
:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
... 10 common frames omitted
Is there a different way (if any) to receive multiple responses when using sendAndReceive
methods?
java spring spring-amqp
java spring spring-amqp
asked Nov 8 at 10:41
alturkovic
527514
527514
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
up vote
1
down vote
accepted
It is not designed to do that; you would need to use a RabbitTemplate.send()
operation and a stand-alone listener container and you would correlate the replies in your code.
EDIT
Here's one way to achieve it (as long as you know how many replies to expect)...
@SpringBootApplication
public class So53206036Application {
public static void main(String args) {
SpringApplication.run(So53206036Application.class, args);
}
@Bean
public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
MultiReplyTemplate template = new MultiReplyTemplate();
template.setConnectionFactory(cf);
template.setMessageConverter(listConverter());
return template;
}
@Bean
public ListConverter listConverter() {
return new ListConverter(new SimpleMessageConverter());
}
@RabbitListener(queues = "foo")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String listen2(String in) {
return in + in;
}
@Bean
public ApplicationRunner runner(MultiReplyTemplate template) {
return args -> {
List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
new ParameterizedTypeReference<List<String>>() { });
System.out.println(reply);
};
}
}
class MultiReplyTemplate extends RabbitTemplate {
private static final byte NOBODY = new byte[0];
private final Map<String, Message> replies = new HashMap<>();
@Override
public void onMessage(Message message) {
// Not thread-safe but that's ok since the DRTMLC is single-threadded.
String corr = message.getMessageProperties().getCorrelationId();
Message combined = this.replies.get(corr);
if (combined == null) {
combined = new Message(NOBODY, new MessageProperties());
combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
this.replies.put(corr, combined);
}
@SuppressWarnings("unchecked")
List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
list.add(message);
if (list.size() == 2) {
this.replies.remove(corr);
combined.getMessageProperties().setCorrelationId(corr);
super.onMessage(combined);
}
}
}
class ListConverter implements SmartMessageConverter {
private final MessageConverter delegate;
ListConverter(MessageConverter delegate) {
this.delegate = delegate;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return this.delegate.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return this.delegate.fromMessage(message); // for listeners
}
@Override
public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
@SuppressWarnings({ "unchecked" })
List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
return list.stream()
.map(m -> this.delegate.fromMessage(m))
.collect(Collectors.toList());
}
}
and
[FOO, foofoo]
I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customizeDirectReplyToMessageListenerContainer
that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
– alturkovic
Nov 8 at 14:35
1
It's not the container that doesn't support multiple replies, it's the template'sonMessage()
method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override theonMessage()
method; accumulate the replies then synthesize a singleMessage
from the replies and callsuper.onMessage()
.
– Gary Russell
Nov 8 at 14:55
I know in advance how many messages I intend to receive and would pass that as a parameter, i.e.expectedReplies
and atimeout
. That sounds like what I am trying to do, I'll try to implement using a subclassedRabbitTemplate
, thanks!
– alturkovic
Nov 8 at 15:02
Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>
) then use a customMessageConverter
to create your actual reply for the sendAndReceive method.
– Gary Russell
Nov 8 at 15:39
Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
– alturkovic
Nov 8 at 15:43
|
show 5 more comments
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
It is not designed to do that; you would need to use a RabbitTemplate.send()
operation and a stand-alone listener container and you would correlate the replies in your code.
EDIT
Here's one way to achieve it (as long as you know how many replies to expect)...
@SpringBootApplication
public class So53206036Application {
public static void main(String args) {
SpringApplication.run(So53206036Application.class, args);
}
@Bean
public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
MultiReplyTemplate template = new MultiReplyTemplate();
template.setConnectionFactory(cf);
template.setMessageConverter(listConverter());
return template;
}
@Bean
public ListConverter listConverter() {
return new ListConverter(new SimpleMessageConverter());
}
@RabbitListener(queues = "foo")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String listen2(String in) {
return in + in;
}
@Bean
public ApplicationRunner runner(MultiReplyTemplate template) {
return args -> {
List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
new ParameterizedTypeReference<List<String>>() { });
System.out.println(reply);
};
}
}
class MultiReplyTemplate extends RabbitTemplate {
private static final byte NOBODY = new byte[0];
private final Map<String, Message> replies = new HashMap<>();
@Override
public void onMessage(Message message) {
// Not thread-safe but that's ok since the DRTMLC is single-threadded.
String corr = message.getMessageProperties().getCorrelationId();
Message combined = this.replies.get(corr);
if (combined == null) {
combined = new Message(NOBODY, new MessageProperties());
combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
this.replies.put(corr, combined);
}
@SuppressWarnings("unchecked")
List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
list.add(message);
if (list.size() == 2) {
this.replies.remove(corr);
combined.getMessageProperties().setCorrelationId(corr);
super.onMessage(combined);
}
}
}
class ListConverter implements SmartMessageConverter {
private final MessageConverter delegate;
ListConverter(MessageConverter delegate) {
this.delegate = delegate;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return this.delegate.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return this.delegate.fromMessage(message); // for listeners
}
@Override
public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
@SuppressWarnings({ "unchecked" })
List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
return list.stream()
.map(m -> this.delegate.fromMessage(m))
.collect(Collectors.toList());
}
}
and
[FOO, foofoo]
I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customizeDirectReplyToMessageListenerContainer
that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
– alturkovic
Nov 8 at 14:35
1
It's not the container that doesn't support multiple replies, it's the template'sonMessage()
method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override theonMessage()
method; accumulate the replies then synthesize a singleMessage
from the replies and callsuper.onMessage()
.
– Gary Russell
Nov 8 at 14:55
I know in advance how many messages I intend to receive and would pass that as a parameter, i.e.expectedReplies
and atimeout
. That sounds like what I am trying to do, I'll try to implement using a subclassedRabbitTemplate
, thanks!
– alturkovic
Nov 8 at 15:02
Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>
) then use a customMessageConverter
to create your actual reply for the sendAndReceive method.
– Gary Russell
Nov 8 at 15:39
Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
– alturkovic
Nov 8 at 15:43
|
show 5 more comments
up vote
1
down vote
accepted
It is not designed to do that; you would need to use a RabbitTemplate.send()
operation and a stand-alone listener container and you would correlate the replies in your code.
EDIT
Here's one way to achieve it (as long as you know how many replies to expect)...
@SpringBootApplication
public class So53206036Application {
public static void main(String args) {
SpringApplication.run(So53206036Application.class, args);
}
@Bean
public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
MultiReplyTemplate template = new MultiReplyTemplate();
template.setConnectionFactory(cf);
template.setMessageConverter(listConverter());
return template;
}
@Bean
public ListConverter listConverter() {
return new ListConverter(new SimpleMessageConverter());
}
@RabbitListener(queues = "foo")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String listen2(String in) {
return in + in;
}
@Bean
public ApplicationRunner runner(MultiReplyTemplate template) {
return args -> {
List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
new ParameterizedTypeReference<List<String>>() { });
System.out.println(reply);
};
}
}
class MultiReplyTemplate extends RabbitTemplate {
private static final byte NOBODY = new byte[0];
private final Map<String, Message> replies = new HashMap<>();
@Override
public void onMessage(Message message) {
// Not thread-safe but that's ok since the DRTMLC is single-threadded.
String corr = message.getMessageProperties().getCorrelationId();
Message combined = this.replies.get(corr);
if (combined == null) {
combined = new Message(NOBODY, new MessageProperties());
combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
this.replies.put(corr, combined);
}
@SuppressWarnings("unchecked")
List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
list.add(message);
if (list.size() == 2) {
this.replies.remove(corr);
combined.getMessageProperties().setCorrelationId(corr);
super.onMessage(combined);
}
}
}
class ListConverter implements SmartMessageConverter {
private final MessageConverter delegate;
ListConverter(MessageConverter delegate) {
this.delegate = delegate;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return this.delegate.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return this.delegate.fromMessage(message); // for listeners
}
@Override
public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
@SuppressWarnings({ "unchecked" })
List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
return list.stream()
.map(m -> this.delegate.fromMessage(m))
.collect(Collectors.toList());
}
}
and
[FOO, foofoo]
I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customizeDirectReplyToMessageListenerContainer
that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
– alturkovic
Nov 8 at 14:35
1
It's not the container that doesn't support multiple replies, it's the template'sonMessage()
method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override theonMessage()
method; accumulate the replies then synthesize a singleMessage
from the replies and callsuper.onMessage()
.
– Gary Russell
Nov 8 at 14:55
I know in advance how many messages I intend to receive and would pass that as a parameter, i.e.expectedReplies
and atimeout
. That sounds like what I am trying to do, I'll try to implement using a subclassedRabbitTemplate
, thanks!
– alturkovic
Nov 8 at 15:02
Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>
) then use a customMessageConverter
to create your actual reply for the sendAndReceive method.
– Gary Russell
Nov 8 at 15:39
Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
– alturkovic
Nov 8 at 15:43
|
show 5 more comments
up vote
1
down vote
accepted
up vote
1
down vote
accepted
It is not designed to do that; you would need to use a RabbitTemplate.send()
operation and a stand-alone listener container and you would correlate the replies in your code.
EDIT
Here's one way to achieve it (as long as you know how many replies to expect)...
@SpringBootApplication
public class So53206036Application {
public static void main(String args) {
SpringApplication.run(So53206036Application.class, args);
}
@Bean
public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
MultiReplyTemplate template = new MultiReplyTemplate();
template.setConnectionFactory(cf);
template.setMessageConverter(listConverter());
return template;
}
@Bean
public ListConverter listConverter() {
return new ListConverter(new SimpleMessageConverter());
}
@RabbitListener(queues = "foo")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String listen2(String in) {
return in + in;
}
@Bean
public ApplicationRunner runner(MultiReplyTemplate template) {
return args -> {
List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
new ParameterizedTypeReference<List<String>>() { });
System.out.println(reply);
};
}
}
class MultiReplyTemplate extends RabbitTemplate {
private static final byte NOBODY = new byte[0];
private final Map<String, Message> replies = new HashMap<>();
@Override
public void onMessage(Message message) {
// Not thread-safe but that's ok since the DRTMLC is single-threadded.
String corr = message.getMessageProperties().getCorrelationId();
Message combined = this.replies.get(corr);
if (combined == null) {
combined = new Message(NOBODY, new MessageProperties());
combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
this.replies.put(corr, combined);
}
@SuppressWarnings("unchecked")
List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
list.add(message);
if (list.size() == 2) {
this.replies.remove(corr);
combined.getMessageProperties().setCorrelationId(corr);
super.onMessage(combined);
}
}
}
class ListConverter implements SmartMessageConverter {
private final MessageConverter delegate;
ListConverter(MessageConverter delegate) {
this.delegate = delegate;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return this.delegate.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return this.delegate.fromMessage(message); // for listeners
}
@Override
public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
@SuppressWarnings({ "unchecked" })
List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
return list.stream()
.map(m -> this.delegate.fromMessage(m))
.collect(Collectors.toList());
}
}
and
[FOO, foofoo]
It is not designed to do that; you would need to use a RabbitTemplate.send()
operation and a stand-alone listener container and you would correlate the replies in your code.
EDIT
Here's one way to achieve it (as long as you know how many replies to expect)...
@SpringBootApplication
public class So53206036Application {
public static void main(String args) {
SpringApplication.run(So53206036Application.class, args);
}
@Bean
public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
MultiReplyTemplate template = new MultiReplyTemplate();
template.setConnectionFactory(cf);
template.setMessageConverter(listConverter());
return template;
}
@Bean
public ListConverter listConverter() {
return new ListConverter(new SimpleMessageConverter());
}
@RabbitListener(queues = "foo")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String listen2(String in) {
return in + in;
}
@Bean
public ApplicationRunner runner(MultiReplyTemplate template) {
return args -> {
List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
new ParameterizedTypeReference<List<String>>() { });
System.out.println(reply);
};
}
}
class MultiReplyTemplate extends RabbitTemplate {
private static final byte NOBODY = new byte[0];
private final Map<String, Message> replies = new HashMap<>();
@Override
public void onMessage(Message message) {
// Not thread-safe but that's ok since the DRTMLC is single-threadded.
String corr = message.getMessageProperties().getCorrelationId();
Message combined = this.replies.get(corr);
if (combined == null) {
combined = new Message(NOBODY, new MessageProperties());
combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
this.replies.put(corr, combined);
}
@SuppressWarnings("unchecked")
List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
list.add(message);
if (list.size() == 2) {
this.replies.remove(corr);
combined.getMessageProperties().setCorrelationId(corr);
super.onMessage(combined);
}
}
}
class ListConverter implements SmartMessageConverter {
private final MessageConverter delegate;
ListConverter(MessageConverter delegate) {
this.delegate = delegate;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return this.delegate.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return this.delegate.fromMessage(message); // for listeners
}
@Override
public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
@SuppressWarnings({ "unchecked" })
List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
return list.stream()
.map(m -> this.delegate.fromMessage(m))
.collect(Collectors.toList());
}
}
and
[FOO, foofoo]
edited Nov 8 at 16:35
answered Nov 8 at 14:26
Gary Russell
76.2k64166
76.2k64166
I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customizeDirectReplyToMessageListenerContainer
that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
– alturkovic
Nov 8 at 14:35
1
It's not the container that doesn't support multiple replies, it's the template'sonMessage()
method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override theonMessage()
method; accumulate the replies then synthesize a singleMessage
from the replies and callsuper.onMessage()
.
– Gary Russell
Nov 8 at 14:55
I know in advance how many messages I intend to receive and would pass that as a parameter, i.e.expectedReplies
and atimeout
. That sounds like what I am trying to do, I'll try to implement using a subclassedRabbitTemplate
, thanks!
– alturkovic
Nov 8 at 15:02
Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>
) then use a customMessageConverter
to create your actual reply for the sendAndReceive method.
– Gary Russell
Nov 8 at 15:39
Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
– alturkovic
Nov 8 at 15:43
|
show 5 more comments
I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customizeDirectReplyToMessageListenerContainer
that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!
– alturkovic
Nov 8 at 14:35
1
It's not the container that doesn't support multiple replies, it's the template'sonMessage()
method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override theonMessage()
method; accumulate the replies then synthesize a singleMessage
from the replies and callsuper.onMessage()
.
– Gary Russell
Nov 8 at 14:55
I know in advance how many messages I intend to receive and would pass that as a parameter, i.e.expectedReplies
and atimeout
. That sounds like what I am trying to do, I'll try to implement using a subclassedRabbitTemplate
, thanks!
– alturkovic
Nov 8 at 15:02
Probably the easiest way would be to create a dummy message and add the real replies to a header (List<Message>
) then use a customMessageConverter
to create your actual reply for the sendAndReceive method.
– Gary Russell
Nov 8 at 15:39
Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
– alturkovic
Nov 8 at 15:43
I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize
DirectReplyToMessageListenerContainer
that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!– alturkovic
Nov 8 at 14:35
I understand it was not intended for this, but it actually "almost works". Multiple reply messages are received, but since the listener does not expect multiple messages, it throws an exception as intended. I was just wondering if there was a way to customize
DirectReplyToMessageListenerContainer
that the template creates so it can receive multiple messages. I was trying to avoid a standalone listener container and use the Direct Reply-to AMQP mechanism, even though it might be an abuse, it is what I need for the project. Thanks for the reply nonetheless!– alturkovic
Nov 8 at 14:35
1
1
It's not the container that doesn't support multiple replies, it's the template's
onMessage()
method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage()
method; accumulate the replies then synthesize a single Message
from the replies and call super.onMessage()
.– Gary Russell
Nov 8 at 14:55
It's not the container that doesn't support multiple replies, it's the template's
onMessage()
method. It's not clear to me how you will know when "all" replies are received, unless it's always a fixed number. You can subclass the template and override the onMessage()
method; accumulate the replies then synthesize a single Message
from the replies and call super.onMessage()
.– Gary Russell
Nov 8 at 14:55
I know in advance how many messages I intend to receive and would pass that as a parameter, i.e.
expectedReplies
and a timeout
. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate
, thanks!– alturkovic
Nov 8 at 15:02
I know in advance how many messages I intend to receive and would pass that as a parameter, i.e.
expectedReplies
and a timeout
. That sounds like what I am trying to do, I'll try to implement using a subclassed RabbitTemplate
, thanks!– alturkovic
Nov 8 at 15:02
Probably the easiest way would be to create a dummy message and add the real replies to a header (
List<Message>
) then use a custom MessageConverter
to create your actual reply for the sendAndReceive method.– Gary Russell
Nov 8 at 15:39
Probably the easiest way would be to create a dummy message and add the real replies to a header (
List<Message>
) then use a custom MessageConverter
to create your actual reply for the sendAndReceive method.– Gary Russell
Nov 8 at 15:39
Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
– alturkovic
Nov 8 at 15:43
Sorry, it seems there is a misunderstanding, the multi-message reply does not come from one application, there are multiple producers replying to the request message from different processes. Sort of like broadcast request-reply.
– alturkovic
Nov 8 at 15:43
|
show 5 more comments
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53206036%2frabbittemplate-extending-directreplytomessagelistenercontainer%23new-answer', 'question_page');
}
);
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password