ActiveMQ Dead Letter Queue with Spring Boot
up vote
0
down vote
favorite
I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input
directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output
directory. All messages are successfully delivered to incoming
but apparently they do not go to dead letter queue. How do I do that?
Here is my main class:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.io.File;
import java.util.Arrays;
@SpringBootApplication
public class DemoApplication {
public static void main(String args) {
SpringApplication.run(DemoApplication.class, args);
}
@Value("${activemq.broker-url}")
private String brokerUrl;
@Value("${activemq.user}")
private String userName;
@Value("${activemq.password}")
private String password;
@Bean
public BrokerService setupBroker() {
try {
brokerService.addConnector(brokerUrl);
brokerService.setUseJmx(true);
brokerService.setUseShutdownHook(false);
brokerService.setAdvisorySupport(true);
brokerService.setEnableStatistics(true);
brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
brokerService.setTempDataStore(createKahaDBTempDataStore());
brokerService.start();
Thread.sleep(6000);
brokerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
return brokerService;
}
private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
return kahaDBPersistenceAdapter;
}
private PListStoreImpl createKahaDBTempDataStore() {
final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
return tempKahaDBStore;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setTrustedPackages(Arrays.asList("com.example.demo"));
return factory;
}
}
Here is my router:
@Component
public class MyRouter extends RouteBuilder {
private static Logger logger = LoggerFactory.getLogger(MyRouter.class);
@Override
public void configure() throws Exception {
onException(Exception.class)
.log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());
errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
.deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
.useOriginalMessage()
.backOffMultiplier(2)
.redeliveryDelay(100)
.maximumRedeliveries(3)
.log("TO DEAD LETTER QUEUE!"));
/*
* Receiving data to an internal queue "incoming",
* then calling rollback() to send the info from internal queue to dead Letter queue
* */
from("file:/JavaProjects/tmp/input?noop=true")
.to("activemq:queue:incoming");
from("activemq:queue:incoming")
.process(new MyProcessor())
.rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
.to("file:/JavaProjects/tmp/output");
//Getting data from Dead Letter Queue and put it to our output folder
from("activemq:queue:deadLetterQueue")
.process(new MyProcessor())
.to("file:/JavaProjects/tmp/output");
}
}
Here is my processor:
public class MyProcessor implements Processor{
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");
}
}
Any help would be appreciated.
java spring spring-boot activemq dead-letter
add a comment |
up vote
0
down vote
favorite
I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input
directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output
directory. All messages are successfully delivered to incoming
but apparently they do not go to dead letter queue. How do I do that?
Here is my main class:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.io.File;
import java.util.Arrays;
@SpringBootApplication
public class DemoApplication {
public static void main(String args) {
SpringApplication.run(DemoApplication.class, args);
}
@Value("${activemq.broker-url}")
private String brokerUrl;
@Value("${activemq.user}")
private String userName;
@Value("${activemq.password}")
private String password;
@Bean
public BrokerService setupBroker() {
try {
brokerService.addConnector(brokerUrl);
brokerService.setUseJmx(true);
brokerService.setUseShutdownHook(false);
brokerService.setAdvisorySupport(true);
brokerService.setEnableStatistics(true);
brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
brokerService.setTempDataStore(createKahaDBTempDataStore());
brokerService.start();
Thread.sleep(6000);
brokerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
return brokerService;
}
private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
return kahaDBPersistenceAdapter;
}
private PListStoreImpl createKahaDBTempDataStore() {
final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
return tempKahaDBStore;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setTrustedPackages(Arrays.asList("com.example.demo"));
return factory;
}
}
Here is my router:
@Component
public class MyRouter extends RouteBuilder {
private static Logger logger = LoggerFactory.getLogger(MyRouter.class);
@Override
public void configure() throws Exception {
onException(Exception.class)
.log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());
errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
.deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
.useOriginalMessage()
.backOffMultiplier(2)
.redeliveryDelay(100)
.maximumRedeliveries(3)
.log("TO DEAD LETTER QUEUE!"));
/*
* Receiving data to an internal queue "incoming",
* then calling rollback() to send the info from internal queue to dead Letter queue
* */
from("file:/JavaProjects/tmp/input?noop=true")
.to("activemq:queue:incoming");
from("activemq:queue:incoming")
.process(new MyProcessor())
.rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
.to("file:/JavaProjects/tmp/output");
//Getting data from Dead Letter Queue and put it to our output folder
from("activemq:queue:deadLetterQueue")
.process(new MyProcessor())
.to("file:/JavaProjects/tmp/output");
}
}
Here is my processor:
public class MyProcessor implements Processor{
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");
}
}
Any help would be appreciated.
java spring spring-boot activemq dead-letter
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input
directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output
directory. All messages are successfully delivered to incoming
but apparently they do not go to dead letter queue. How do I do that?
Here is my main class:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.io.File;
import java.util.Arrays;
@SpringBootApplication
public class DemoApplication {
public static void main(String args) {
SpringApplication.run(DemoApplication.class, args);
}
@Value("${activemq.broker-url}")
private String brokerUrl;
@Value("${activemq.user}")
private String userName;
@Value("${activemq.password}")
private String password;
@Bean
public BrokerService setupBroker() {
try {
brokerService.addConnector(brokerUrl);
brokerService.setUseJmx(true);
brokerService.setUseShutdownHook(false);
brokerService.setAdvisorySupport(true);
brokerService.setEnableStatistics(true);
brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
brokerService.setTempDataStore(createKahaDBTempDataStore());
brokerService.start();
Thread.sleep(6000);
brokerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
return brokerService;
}
private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
return kahaDBPersistenceAdapter;
}
private PListStoreImpl createKahaDBTempDataStore() {
final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
return tempKahaDBStore;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setTrustedPackages(Arrays.asList("com.example.demo"));
return factory;
}
}
Here is my router:
@Component
public class MyRouter extends RouteBuilder {
private static Logger logger = LoggerFactory.getLogger(MyRouter.class);
@Override
public void configure() throws Exception {
onException(Exception.class)
.log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());
errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
.deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
.useOriginalMessage()
.backOffMultiplier(2)
.redeliveryDelay(100)
.maximumRedeliveries(3)
.log("TO DEAD LETTER QUEUE!"));
/*
* Receiving data to an internal queue "incoming",
* then calling rollback() to send the info from internal queue to dead Letter queue
* */
from("file:/JavaProjects/tmp/input?noop=true")
.to("activemq:queue:incoming");
from("activemq:queue:incoming")
.process(new MyProcessor())
.rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
.to("file:/JavaProjects/tmp/output");
//Getting data from Dead Letter Queue and put it to our output folder
from("activemq:queue:deadLetterQueue")
.process(new MyProcessor())
.to("file:/JavaProjects/tmp/output");
}
}
Here is my processor:
public class MyProcessor implements Processor{
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");
}
}
Any help would be appreciated.
java spring spring-boot activemq dead-letter
I would like to implement a basic deadLetterQueue with using Java DSL, an embedded ActiveMQ and KahaDB as persistence adapter. I have a text file in my C:/JavaProjects/tmp/input
directory. I would like to take the file into an internal queue, produce an unsuccessful delivery with calling .rollback(), then the message should forward to deadLetterQueue, after that I will take the data from deadLetterQueue and put it to C:/JavaProjects/tmp/output
directory. All messages are successfully delivered to incoming
but apparently they do not go to dead letter queue. How do I do that?
Here is my main class:
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.plist.PListStoreImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.io.File;
import java.util.Arrays;
@SpringBootApplication
public class DemoApplication {
public static void main(String args) {
SpringApplication.run(DemoApplication.class, args);
}
@Value("${activemq.broker-url}")
private String brokerUrl;
@Value("${activemq.user}")
private String userName;
@Value("${activemq.password}")
private String password;
@Bean
public BrokerService setupBroker() {
try {
brokerService.addConnector(brokerUrl);
brokerService.setUseJmx(true);
brokerService.setUseShutdownHook(false);
brokerService.setAdvisorySupport(true);
brokerService.setEnableStatistics(true);
brokerService.setPersistenceAdapter(createKahaDBPersistenceAdapter());
brokerService.setTempDataStore(createKahaDBTempDataStore());
brokerService.start();
Thread.sleep(6000);
brokerService.stop();
} catch (Exception e) {
e.printStackTrace();
}
return brokerService;
}
private KahaDBPersistenceAdapter createKahaDBPersistenceAdapter() {
final KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
kahaDBPersistenceAdapter.setDirectory(new File("C:\JavaProjects\tmp", "activemq/kahadb"));
kahaDBPersistenceAdapter.setCompactAcksIgnoresStoreGrowth(true);
kahaDBPersistenceAdapter.setCompactAcksAfterNoGC(5);
return kahaDBPersistenceAdapter;
}
private PListStoreImpl createKahaDBTempDataStore() {
final PListStoreImpl tempKahaDBStore = new PListStoreImpl();
tempKahaDBStore.setDirectory(new File("C:\JavaProjects\tmp", "activemq/tmp"));
return tempKahaDBStore;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setTrustedPackages(Arrays.asList("com.example.demo"));
return factory;
}
}
Here is my router:
@Component
public class MyRouter extends RouteBuilder {
private static Logger logger = LoggerFactory.getLogger(MyRouter.class);
@Override
public void configure() throws Exception {
onException(Exception.class)
.log(LoggingLevel.ERROR, logger, "THE ERROR IS: " + exceptionMessage().toString());
errorHandler(deadLetterChannel("activemq:queue:deadLetterQueue")
.deadLetterHandleNewException(true)//guarantees to deadLetterChannel to complete.
.useOriginalMessage()
.backOffMultiplier(2)
.redeliveryDelay(100)
.maximumRedeliveries(3)
.log("TO DEAD LETTER QUEUE!"));
/*
* Receiving data to an internal queue "incoming",
* then calling rollback() to send the info from internal queue to dead Letter queue
* */
from("file:/JavaProjects/tmp/input?noop=true")
.to("activemq:queue:incoming");
from("activemq:queue:incoming")
.process(new MyProcessor())
.rollback("'INSIDE ROLLBACK' Lets go to the DEAD QUEUE!")
.to("file:/JavaProjects/tmp/output");
//Getting data from Dead Letter Queue and put it to our output folder
from("activemq:queue:deadLetterQueue")
.process(new MyProcessor())
.to("file:/JavaProjects/tmp/output");
}
}
Here is my processor:
public class MyProcessor implements Processor{
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("nMessage ID is: " + exchange.getIn().getMessageId());
System.out.println("nFile`s content is: " + exchange.getIn().getBody(String.class) + "n");
}
}
Any help would be appreciated.
java spring spring-boot activemq dead-letter
java spring spring-boot activemq dead-letter
edited Nov 9 at 13:11
asked Nov 9 at 11:06
Serhat
388
388
add a comment |
add a comment |
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53224541%2factivemq-dead-letter-queue-with-spring-boot%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown