RabbitMQ的安装就不说了。
官方文档: Using Spring AMQP
最近粗略地研究了下在Spring中调用RabbitMQ服务。 下面是简单地配置与使用实例。
1.1配置
1.1.1 引入maven工具库
我们在Spring框架中进行开发, 因此选择与spring集成好的mq客户端。
在pom.xml的依赖中加入如下配置:
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency>
1.1.2 主配置文件spring-mq.xml
新建主配置文件rabbit-mq.xml, 并于web.xml中引入:
Spring-mq.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd"> <!-- 连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="test" password="test" port="5672" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <rabbit:queue id="queue_rabbit" durable="true" auto-delete="false" exclusive="false" name="queue_rabbit"/> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring template声明--> <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jackson2JsonMessageConverter"/> <bean id="jackson2JsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <task:executor id="taskExecutor" pool-size="4-256" queue-capacity="128" /> <!-- 监听处理类 --> <bean id="queueOneLitener" class="com.test.sender.listener.QueueOneLitener" /> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="queue_rabbit" ref="queueOneLitener" method="onMessage" /> </rabbit:listener-container> </beans> 配置文件详解:
- 服务器配置(rabbit:connection-factory)。 Rabbit-mq的队列都存在于服务器。 我们在配置文件中指定rabbit-mq服务器的ip, 端口以及用户名密码。
- Queue队列声明(rabbit:queue)。 无论是消费者还是生产者, 需要用到的队列都得在配置文件中进行声明。
- 路由配置(rabbit:direct-exchange)。 通过exchange设置的路由规则,队列和某一key值绑定起来。
<rabbit:binding queue="queue_one" key="queue_one_key"/>
上边的这个例子中, 队列queue_one的key值为queue_one_key, 如果生产者产生一条key值为queue_one_key的消息, 消息通过上述路由后,便会被放到queue_one队列中。
- Template配置。 生产者通过template实例来发送消息。在其配置中, 需要指定路由(exchange),即:生产的消息需要通过什么样的路由规则放到相应的队列中。
<rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jackson2JsonMessageConverter"/>
- Listener配置。 如果代码中有消费者, 那就需要配置listener。
<rabbit:listener queues="queue_rabbit" ref="queueOneLitener" method="onMessage" />
上述listener监听的队列是queue_rabbit, 如果队列中有消息入队, 那么消息就被监听类捕获并处理。
1.1.3 生产者代码
// 注入生产者实例 @Resource(name="amqpTemplate") private RabbitTemplate template; // 发送消息。第一个参数是路由键值, 第二个参数是具体的消息 template.convertAndSend("queue_one_key","Hello!");
1.1.4 消费者监听代码
public class QueueOneLitener implements MessageListener{ @Override public void onMessage(Message message) { System.out.println(" data get by sender:" + new String(message.getBody())); } }
如果队列收到了消息, 相应的队列监听类的onMessage被触发。我们通过new String(message.getBody())获取具体的消息