Thursday, 4 January 2018

SpringBoot RabbitMQ Integration

SpringBoot RabbitMQ Integration


Spring Integration enables lightweight messaging within Spring-based applications and supports integration with
external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring's support
for remoting, messaging, and scheduling. Spring Integration's primary goal is to provide a simple model for building
enterprise integration solutions while maintaining the separation of concerns that is essential for producing
maintainable, testable code.

Video:


Why RabbitMQ:

  • Decouple message publisher and consumer
  • Store the messages
  • Routing of messages
  • Monitoring and management of messages

SpringBoot RabbitMQ Diagram





Producer:



In this spring boot module, We will config rabbitMQ details including exchange, queue and we will send message to queue.


Exchange:

We will create one exchange and send message to this exchange.


Routing :

We will create routing key and bind exchange with specific queue with routing key.


Queue:



Here, our message will be stored. Once consumed by consumers, message will be removed from queue.

RabbitMQ Configuration in spring boot:



application.properties


server.port=8001


#Rabbitmq configuration
spring.rabbitmq.host = 127.0.0.1
spring.rabbitmq.port = 5672
spring.rabbitmq.username = guest
spring.rabbitmq.password = guest


RobbitMqConfig.java
package com.sachin4java.rabbitmqproducer.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import com.sachin4java.rabbitmqproducer.listener.MessageListener;

@Configuration
public class RobbitMqConfig {
public static final String ROUTING_KEY = "my.queue.key";
@Bean
Queue queue() {
return new Queue(ROUTING_KEY, true);
}


@Bean
TopicExchange exchange() {
return new TopicExchange("my_queue_exchange");
}


@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
//Below highlighted for RabbitMQ listener


@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ROUTING_KEY);
container.setMessageListener(listenerAdapter);
return container;
}


@Bean
MessageListenerAdapter myQueueListener(MessageListener listener) {
return new MessageListenerAdapter(listener, "onMessage");
}
}






Module name
Port
Type
Dependencies
rabbitmq-producer
8001
Sender
Web, AMQP
rabbitmq-consumer
8002
Receiver
Web, AMQP


Rabbitmq-producer module:


ProducerController.java


package com.sachin4java.rabbitmqproducer.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


import com.sachin4java.rabbitmqproducer.producer.MessageProducer;


@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
   private MessageProducer messageProducer;
   
@GetMapping
public void produce(@RequestParam String message) {
messageProducer.sendMessage(message);
   
}


}


MessageProducer.java


package com.sachin4java.rabbitmqproducer.producer;


import java.util.Date;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import com.sachin4java.rabbitmqproducer.config.RobbitMqConfig;


@Component
public class MessageProducer {
@Autowired
   private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {


System.out.println(new Date());
rabbitTemplate.convertAndSend(RobbitMqConfig.ROUTING_KEY, message);
   System.out.println("Is listener returned ::: "+rabbitTemplate.isReturnListener());
   System.out.println(new Date());
}
}
RobbitMqConfig.java


package com.sachin4java.rabbitmqproducer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import com.sachin4java.rabbitmqproducer.listener.MessageListener;

@Configuration
public class RobbitMqConfig {
public static final String ROUTING_KEY = "my.queue.key";
@Bean
Queue queue() {
return new Queue(ROUTING_KEY, true);
}


@Bean
TopicExchange exchange() {
return new TopicExchange("my_queue_exchange");
}


@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}


/*@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ROUTING_KEY);
container.setMessageListener(listenerAdapter);
return container;
}


@Bean
MessageListenerAdapter myQueueListener(MessageListener listener) {
return new MessageListenerAdapter(listener, "onMessage");
}*/
}


Rabbitmq-consumer module:



RobbitMqConfig.java


package com.sachin4java.rabbitmqproducer.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


import com.sachin4java.rabbitmqproducer.listener.MessageListener;

@Configuration
public class RobbitMqConfig {
public static final String ROUTING_KEY = "my.queue.key";
@Bean
Queue queue() {
return new Queue(ROUTING_KEY, true);
}


@Bean
TopicExchange exchange() {
return new TopicExchange("my_queue_exchange");
}


@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}


@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(ROUTING_KEY);
container.setMessageListener(listenerAdapter);
return container;
}


@Bean
MessageListenerAdapter myQueueListener(MessageListener listener) {
return new MessageListenerAdapter(listener, "onMessage");
}
}


MessageListener.java


package com.sachin4java.rabbitmqproducer.listener;


public interface MessageListener {
public void onMessage(String message);


}


MessageListenerImpl.java


package com.sachin4java.rabbitmqproducer.listener;


import java.util.Date;


import org.springframework.stereotype.Component;


@Component
public class MessageListenerImpl implements MessageListener{


@Override
public void onMessage(String message) {
System.out.println(new Date());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Message Received:"+message);
System.out.println(new Date());
}

}

GIT Repository


No comments:

Post a Comment

Extract error records while inserting into db table using JDBCIO apache beam in java

 I was inserting data into postgres db using apache beam pipeline. it works perfectly with JdbcIO write of apache beam library. But, now, i ...