Spring中使用RabbitMQ

RabbitMQ的安装就不说了。

官方文档:  Using Spring AMQP

 

最近粗略地研究了下在Spring中调用RabbitMQ服务。 下面是简单地配置与使用实例。

1.1配置

1.1.1 引入maven工具库

我们在Spring框架中进行开发, 因此选择与spring集成好的mq客户端。

在pom.xml的依赖中加入如下配置:

  <dependency>
     <groupId>org.springframework.amqp</groupId>
     <artifactId>spring-rabbit</artifactId>
     <version>1.3.5.RELEASE</version>
  </dependency>

 

1.1.2 主配置文件spring-mq.xml

新建主配置文件rabbit-mq.xml, 并于web.xml中引入:

Spring-mq.xml:

<?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 xmlns:task="http://www.springframework.org/schema/task"
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
     http://www.springframework.org/schema/rabbit 
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
     http://www.springframework.org/schema/task  
     http://www.springframework.org/schema/task/spring-task-4.1.xsd">
  
 <!-- 连接服务配置   -->
 <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="test"
 password="test" port="5672"  />
  
 <rabbit:admin connection-factory="connectionFactory"/>
  
 <!-- queue 队列声明-->
 <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>
 <rabbit:queue id="queue_rabbit" durable="true" auto-delete="false" exclusive="false" name="queue_rabbit"/>
  
 <!-- exchange queue binging key 绑定 -->
 <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">
 <rabbit:bindings>
 <rabbit:binding queue="queue_one" key="queue_one_key"/>
 </rabbit:bindings>
 </rabbit:direct-exchange>
  
 <!-- spring template声明-->
 <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jackson2JsonMessageConverter"/>
  
 <bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
  
  
 <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
  
 <task:executor id="taskExecutor" pool-size="4-256" queue-capacity="128" />
  
 <!--  监听处理类    -->
 <bean id="queueOneLitener" class="com.test.sender.listener.QueueOneLitener" />
 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">
 <rabbit:listener queues="queue_rabbit" ref="queueOneLitener"  method="onMessage"  />
 </rabbit:listener-container>
 </beans>
  
 配置文件详解:
  1. 服务器配置(rabbit:connection-factory)。 Rabbit-mq的队列都存在于服务器。 我们在配置文件中指定rabbit-mq服务器的ip, 端口以及用户名密码。
  1. Queue队列声明(rabbit:queue)。 无论是消费者还是生产者, 需要用到的队列都得在配置文件中进行声明。
  1. 路由配置(rabbit:direct-exchange)。 通过exchange设置的路由规则,队列和某一key值绑定起来。
     <rabbit:binding queue="queue_one" key="queue_one_key"/>

上边的这个例子中, 队列queue_one的key值为queue_one_key 如果生产者产生一条key值为queue_one_key的消息, 消息通过上述路由后,便会被放到queue_one队列中。

  1. Template配置。 生产者通过template实例来发送消息。在其配置中, 需要指定路由(exchange),即:生产的消息需要通过什么样的路由规则放到相应的队列中。
<rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jackson2JsonMessageConverter"/>
  1. Listener配置。 如果代码中有消费者, 那就需要配置listener。
<rabbit:listener queues="queue_rabbit" ref="queueOneLitener"  method="onMessage"  />

上述listener监听的队列是queue_rabbit, 如果队列中有消息入队, 那么消息就被监听类捕获并处理。

 

1.1.3 生产者代码

 // 注入生产者实例
 @Resource(name="amqpTemplate")
 private RabbitTemplate template;
 // 发送消息。第一个参数是路由键值, 第二个参数是具体的消息
 template.convertAndSend("queue_one_key","Hello!");

 

1.1.4 消费者监听代码

 public class QueueOneLitener implements  MessageListener{
    @Override
     public void onMessage(Message message) {
          System.out.println(" data get by sender:" + new String(message.getBody()));
    }
 }

如果队列收到了消息, 相应的队列监听类的onMessage被触发。我们通过new String(message.getBody())获取具体的消息