SpringBoot RabbitMQ Integration
Spring Integration enables lightweight messaging within Spring-based applications and supports integration with
external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring's support
for remoting, messaging, and scheduling. Spring Integration's primary goal is to provide a simple model for building
enterprise integration solutions while maintaining the separation of concerns that is essential for producing
maintainable, testable code.
Video:
VIDEO
Why RabbitMQ:
SpringBoot RabbitMQ Diagram
Producer:
In this spring boot module, We will config rabbitMQ details including exchange, queue and we will send message to queue.
Exchange:
We will create one exchange and send message to this exchange.
Routing :
We will create routing key and bind exchange with specific queue with routing key.
Queue:
Here, our message will be stored. Once consumed by consumers, message will be removed from queue.
RabbitMQ Configuration in spring boot:
application.properties
server.port=8001
#Rabbitmq configuration
spring.rabbitmq.host = 127.0.0.1
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest
RobbitMqConfig.java
package com.sachin4java.rabbitmqproducer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.sachin4java.rabbitmqproducer.listener.MessageListener;
@Configuration
public class RobbitMqConfig {
public static final String ROUTING_KEY = "my.queue.key";
@Bean
Queue queue() {
return new Queue(ROUTING_KEY, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("my_queue_exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
//Below highlighted for RabbitMQ listener
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ROUTING_KEY);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter myQueueListener(MessageListener listener) {
return new MessageListenerAdapter(listener, "onMessage");
}
}
Module name
Port
Type
Dependencies
rabbitmq-producer
8001
Sender
Web, AMQP
rabbitmq-consumer
8002
Receiver
Web, AMQP
Rabbitmq-producer module:
ProducerController.java
package com.sachin4java.rabbitmqproducer.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.sachin4java.rabbitmqproducer.producer.MessageProducer;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private MessageProducer messageProducer;
@GetMapping
public void produce(@RequestParam String message) {
messageProducer.sendMessage(message);
}
}
MessageProducer.java
package com.sachin4java.rabbitmqproducer.producer;
import java.util.Date;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.sachin4java.rabbitmqproducer.config.RobbitMqConfig;
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
System.out.println(new Date());
rabbitTemplate.convertAndSend(RobbitMqConfig.ROUTING_KEY, message);
System.out.println("Is listener returned ::: "+rabbitTemplate.isReturnListener());
System.out.println(new Date());
}
}
RobbitMqConfig.java
package com.sachin4java.rabbitmqproducer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.sachin4java.rabbitmqproducer.listener.MessageListener;
@Configuration
public class RobbitMqConfig {
public static final String ROUTING_KEY = "my.queue.key";
@Bean
Queue queue() {
return new Queue(ROUTING_KEY, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("my_queue_exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
/*@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ROUTING_KEY);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter myQueueListener(MessageListener listener) {
return new MessageListenerAdapter(listener, "onMessage");
}*/
}
Rabbitmq-consumer module:
RobbitMqConfig.java
package com.sachin4java.rabbitmqproducer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.sachin4java.rabbitmqproducer.listener.MessageListener;
@Configuration
public class RobbitMqConfig {
public static final String ROUTING_KEY = "my.queue.key";
@Bean
Queue queue() {
return new Queue(ROUTING_KEY, true);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("my_queue_exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ROUTING_KEY);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter myQueueListener(MessageListener listener) {
return new MessageListenerAdapter(listener, "onMessage");
}
}
MessageListener.java
package com.sachin4java.rabbitmqproducer.listener;
public interface MessageListener {
public void onMessage(String message);
}
MessageListenerImpl.java
package com.sachin4java.rabbitmqproducer.listener;
import java.util.Date;
import org.springframework.stereotype.Component;
@Component
public class MessageListenerImpl implements MessageListener{
@Override
public void onMessage(String message) {
System.out.println(new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Message Received:"+message);
System.out.println(new Date());
}
}
GIT Repository