下载镜像
方式一:在线拉取
1
| docker pull rabbitmq:3.8-management
|
1.2.安装MQ
执行下面的命令来运行MQ容器:
1 2 3 4 5 6 7 8 9 10
| docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3.8-management
|
MQ概念
什么是MQ :
RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协 议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
整体上是一个生产者和消费者模型,主要负责接受、存储和转发消息。可以把消息传递的过程想象成;当一个包裹送到邮具,邮局会暂存并会最终通过邮递员送到收件人的手上。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
https://www.rabbitmq.com/news.html
核心概念:
生产者:产生数据发送消息的程序是生产者
交换机: 接受来自生产者的消息,并将消息推送到队列中。交换机必须 确切知道如何处理到他接受的消息,是将消息推送到特定的队列还是多个对队列,或是将消息丢弃,由交换机类型决定。
队列:
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

Connection:
publisher/consumer 和 broker 之间的 TCP 连接
Channel:
如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程 序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Virtual host:
出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Exchange(交换器)
Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) 。根据分发规则,匹配查询表中的 RoutingKey,找到交换机与消息队列的绑定,分发消息到 queue 中去。
RabbitMQ 的 Exchange(交换器) 有4种类型,不同的类型对应着不同的路由策略:
direct(默认), fanout, topic, 和 headers,不同类型的Exchange转发消息的策略有所区别。
生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
Binding:
exchange 和 queue 之间的虚拟连接
RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图 所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一 个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。

生产者将消息发送给交换器时,需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视,而是将消息路由到所有绑定到该交换器的队列中
Queue(消息队列)
Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免的消息被重复消费。
Broker(消息中间件的服务节点)
接收和分发消息的应用,RabbitMQ 服务节点,或者 RabbitMQ服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

生产者将消息存入 RabbitMQ Broker,以及消费者从Broker中消费数据的整个流程
功能特点
具体特点可以从易用性、扩展性、可靠性和高可用等方面进行分析
- 可靠性:RabbitMQ使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等
- 灵活路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
- 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节 点。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可 用。
- 支持多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间 件协议。
- 多语言客户端:RabbitMQ几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、 JavaScript等。
- 易用的管理界面:RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中 的节点等。在安装 RabbitMQ 的时候会介绍到,安装好 RabbitMQ 就自带管理界面。
- 插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展

MQ的分类:
1.ActiveMQ
优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较 低的概率丢失数据
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
2.Kafka
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥 着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。
优点:
- 吞吐量高 ,性能卓越,单机写入 TPS 约在百万条/秒。
- 时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,
- 消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
- 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager;
- 在日志领域比较成熟,被多家公司和多个开源项目使用;
- 功能支持: 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点:
- Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;
- 支持消息顺序, 但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
3.RocketMQ
RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一 些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
优点:
- 单机吞吐量十万级,可用性非常高,
- 分布式架构,消息可以做到 0 丢失,
- MQ 功能较为完善,分布式,易扩展,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,
缺点:
- 支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;
- 社区活跃度一般,没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
MQ 的选择
1.Kafka
Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,适合日志采集
2.RocketMQ
天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
3.RabbitMQ
结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分 方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
工作队列(又名:任务队列)

工作队列(又名:任务队列)
对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
轮循分发(Round-robin dispatching)
任务队列的一个好处就是能很简单的并行工作。如果我们有积压的工作,我们只需要添加更多的工作者,很容易扩展。
首先,让我们同时跑两个工作者实例。它们都将从队列中获取消息,结果怎样,我们拭目以待。
你需要开三个控制台,两个跑工作者程序。这两个就是我们的消费者 - C1
和 C2
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package mq.Worker;
import com.rabbitmq.client.Channel; import mq.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task01 { private static final String QUEUE_NAME="hello"; public static void main(String[] args) throws Exception { try(Channel channel= RabbitMqUtils.getChannel();) { channel.queueDeclare(QUEUE_NAME,false,false,false,null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完成:"+message); } } }
|
Worker-1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package mq.Worker;
import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils;
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = ((consumerTag, message) -> {
final String s = new String(message.getBody()); System.out.println("接受到的消息" + s); });
CancelCallback cancelCallback = (consumerTag -> { System.out.println(consumerTag+"消费者取消接口回调逻辑"); }); System.out.println("C1 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); }
}
|
Work-2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package mq.Worker;
import com.rabbitmq.client.*; import mq.utils.RabbitMqUtils;
public class Worker02 { private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody()); System.out.println(message); }; CancelCallback cancelCallback = (consumerTag) -> { System.out.println("消息消费被中断"); }; System.out.println("C2 消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }
}
|

第三个控制台我们发布新的任务。一旦你启动好了消费者们,你就可以发布消息了:即执行生产者程序NewTask.java
。以下是执行结果:

rabbitmq-newtask
下面是当执行上面的程序之后,两个worker的输出:
worker-1

worker-1

worker-2

默认的,RabbitMQ会有序的将一个个消息交付给下一个消费者。平均每个消费者将会得到相同数量的消息。这种分发消息的方式叫轮循(round-robin)。
消息应答:
概念
完成一个任务需要花费一些时间,你可以想象一个需要较长时间完成的任务在执行的中途中挂了会发生什么。我们当前的代码,一旦RabbitMQ将消息交付给客户,它将立即从内存中被移除。在这个例子中,如果你在执行过程中杀死一个工作者我们将丢失这个消息。我们也会丢失所有分发给指定的这个工作者但是还没有处理的消息。
但是我们不想丢失任何任务。如果一个工作者挂了,我们希望这个任务能交付给另一个工作者。
为了确保消息永远不会丢失,RabbitMQ提供了消息确认机制,消费者向RabbitMQ中发送一个确认表示消息已经接收、处理并且RabbitMQ可以自由的删除它了。
如果一个消费者挂了(通道关闭,连接关闭或者TCP连接丢失)没有发送确认,RabbitMQ将会理解成消息没有被完全处理并将消息发回队列。如果这个时候有其他消费者在线,它将被快速的交付给另一个消费者。通过这种方式能确保没有消息丢失,即使工作者偶尔挂掉。
这里没有消息超时,当消费者挂了RabbitMQ将会重新交付消息。如果一个消息的处理过程花费了很长很长的时间,这个是允许的。
自动应答
在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
消息应答的方法
A.Channel.basicAck(用于肯定确认)
- RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
- 与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃
Multiple 的解释
手动应答的方式可以批量应答并且减少网路拥堵

multiple 的 true 和 false 代表不同意思 true 代表批量应答
channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消息自动重新入队
如果消费者因为某种原因断开连接,导致消息并未发送ACK确认,RabbitMq将了解的消息么没有完全删除,对其进行重新排队。如果有其他的消费者 ,会重新分配给下一个消费者。这样可以在确保不会丢失任何消息

手动应答效果演示:
我们模拟了一个生产者和两个消费者,我们将一个消费者work03设置消费时常1秒,另外一个设置为30秒,我们在使用的时候,发送11 、22,work03快速响应,work04 消费22,我关闭work04 之后,22会给work03进行消费



1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package mq.three;
import com.rabbitmq.client.Channel; import mq.utils.RabbitMqUtils;
import java.nio.charset.StandardCharsets; import java.util.Scanner;
public class Task2 {
public static final String TASK_QUEUE_NAME ="ack_queue";
public static void main(String[] args) throws Exception { final Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息"+message); }
} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package mq.three;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils; import mq.utils.SleepUtils;
public class Work03 { private static final String ACK_QUEUE_NAME="ack_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C1 等待接收消息处理时间较短"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message= new String(delivery.getBody()); SleepUtils.sleep(1); System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; boolean autoAck=false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package mq.three;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils; import mq.utils.SleepUtils;
public class Work04 { private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); System.out.println("C2 等待接收消息处理时间较长"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message= new String(delivery.getBody()); SleepUtils.sleep(30); System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; boolean autoAck=false; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{ System.out.println(consumerTag+"消费者取消消费接口回调逻辑"); }); } }
|

Rabbit持久化:
概念:
如何保障当RabbitMQ服务停掉以后消息生产者发送过来的消息不丢失。默认情况下RabbitMQ退出操作或者由于某种原因崩溃时,它忽视队列和消息。确保消息不会丢失需要做的两件事: 需要将消息和队列都标记为持久化。
队列持久化
需要在声明队列的时候把 durable 参数设置为持久化


消息持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性。
将消息标记为持久化
1 2 3 4
| String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息"+message);
|
不公平分发
预取值
可以指定不同队列指定的数目

发布确认:
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的
消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker
就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队
列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传
给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置
basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信
道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调
方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消
息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
开启发布确认的方法
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用

单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它
被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会
阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。
批量确认发布
当发生故障导致发布出现问题时,不知道是哪个消息出现
问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种
方案仍然是同步的,也一样阻塞消息的发布。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| package mq.four;
import com.rabbitmq.client.Channel; import mq.utils.RabbitMqUtils;
import java.util.UUID;
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
ConfirmMessage.publishMessageBatch(); }
public static void publishMessageIndividually() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("消息发送成功"); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms"); }
public static void publishMessageBatch() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect(); int batchSize = 100; int outstandingMessageCount = 0; long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = i + ""; channel.basicPublish("", queueName, null, message.getBytes()); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { channel.waitForConfirms(); outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { channel.waitForConfirms(); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + "ms"); } }
|
异步确认发布
消息生产者进行发送,不用等消费进行确认,而直接使用回调,消息需要存储在map
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,
比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传
递。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public static void publishMessageAsync() throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect();
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true); confirmed.clear(); }else{ outstandingConfirms.remove(sequenceNumber); } }; ConfirmCallback nackCallback = (sequenceNumber, multiple) -> { String message = outstandingConfirms.get(sequenceNumber); System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber); };
channel.addConfirmListener(ackCallback, null); long begin = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = "消息" + i;
outstandingConfirms.put(channel.getNextPublishSeqNo(), message); channel.basicPublish("", queueName, null, message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms"); } }
|
5.1Exchanges
5.1.1 EXcahnges概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,**生产者只能将消息发送到交换机(exchange)**,交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消
息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

5.1.2 Exchanges的类型
总共有以下类型
直接(direct),主题(topic),标题(headers),扇出(fanout)
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称
的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连
接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
创建出来之后长成这样:

绑定(bindings)
binding 是exchange和queue之间的桥梁,告诉我们exchange和那个队列进行绑定

5.4. Fanout
发消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| package mq.Fanout;
import com.rabbitmq.client.Channel; import mq.utils.RabbitMqUtils;
import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException;
public class EmitLog { private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) { try (Channel channel = RabbitMqUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner sc = new Scanner(System.in); System.out.println("请输入信息"); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package mq.Fanout;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils;
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接受消息,把接受到的消息打印在屏幕上");
DeliverCallback deliverCallback = ((consumerTag, message) -> {
System.out.println("控制台打印消息" + new String(message.getBody(), "UTF-8")); });
channel.basicConsume(queueName,true,deliverCallback,(consumerTag, sig) -> {});
}
}
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package mq.Fanout;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils;
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME ="logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("2等待接受消息,把接受到的消息打印在屏幕上");
DeliverCallback deliverCallback = ((consumerTag, message) -> {
System.out.println("控制台打印消息" + new String(message.getBody(), "UTF-8")); });
channel.basicConsume(queueName,true,deliverCallback,(consumerTag, sig) -> {});
}
}
|


5.5.Direct exchange
什么是绑定bindings,绑定是队列和交换机之间的桥梁关系,队列只对它绑定的交换机感兴趣,绑定使用参数:routingkey表示该参数为binging key,创建绑定的使用代码 channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”),绑定之后由交换类型决定。就是向指定的的路由发送信息

在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,
队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列
Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
多重绑定

如果exchange的绑定类型是direct,但是他绑定的多个队列的key都是相同的,在这种情况下虽然绑定类型是direct
,但是它绑定的多个队列的key如果相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示

发送日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package mq.direct;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import mq.utils.RabbitMqUtils;
import java.util.HashMap; import java.util.Map;
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("info", "普通 info 信息"); bindingKeyMap.put("warning", "警告 warning 信息"); bindingKeyMap.put("error", "错误 error 信息"); bindingKeyMap.put("debug", "调试 debug 信息"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息:" + message); } } }
}
|
接受日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package mq.direct;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils;
public class ReceiveLogsDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "disk"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println("等待接受的消息..."); DeliverCallback deliverCallback = ((consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message; System.out.println("错误日志已经接收"); }); channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
接受日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package mq.direct;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils;
public class ReceiveLogsDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "console"; channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消 息:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
5.6.Topic
使用fanout能够随意进行广播日志,使用的是direct交换机,能够有选择的接受日志有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型,针对接受的
绑定带有后缀的数据
绑定关系案例:
下图绑定关系如下
Q1–>绑定的是
中间带 orange 带 3 个单词的字符串(.orange.)
Q2–>绑定的是
最后一个单词是 rabbit 的 3 个单词(..rabbit)
第一个单词是 lazy 的多个单词(lazy.#)

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2
当队列绑定关系是下列这种情况时需要引起注意
**当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 **fanout 了
*如果队列绑定键当中没有#和出现,那么该队列绑定类型就是 direct **

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package mq.topic;
import com.rabbitmq.client.Channel; import mq.utils.RabbitMqUtils;
import java.util.HashMap; import java.util.Map;
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } } }
|
ReceiveLogsTopic01
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package mq.topic;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils;
public class ReceiveLogsTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName="Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接收队列 :"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
ReceiveLogsTopic02
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package mq.topic;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import mq.utils.RabbitMqUtils;
public class ReceiveLogsTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName="Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("等待接收消息....."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接收队列 :"+queueName+" 绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }
}
|
EmitLogTopic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package mq.topic;
import com.rabbitmq.client.Channel; import mq.utils.RabbitMqUtils;
import java.util.HashMap; import java.util.Map;
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { try (Channel channel = RabbitMqUtils.getChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } } }
|




6.1死信队列
3.SpringAMQP
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp


SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
3.1.Basic Queue 简单队列模型
在父工程mq-demo中引入依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
3.1.1.消息发送
首先配置MQ地址,在publisher服务的application.yml中添加配置:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.150.101 port: 5672 virtual-host: / username: itcast password: 123321
|
然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package cn.itcast.mq.spring;
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAmqpTest {
@Autowired private RabbitTemplate rabbitTemplate;
@Test public void testSimpleQueue() { String queueName = "simple.queue"; String message = "hello, spring amqp!"; rabbitTemplate.convertAndSend(queueName, message); } }
|
3.1.2.消息接收
首先配置MQ地址,在consumer服务的application.yml中添加配置:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.150.101 port: 5672 virtual-host: / username: itcast password: 123321
|
然后在consumer服务的cn.itcast.mq.listener
包中新建一个类SpringRabbitListener,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) throws InterruptedException { System.out.println("spring 消费者接收到消息:【" + msg + "】"); } }
|
3.1.3.测试
启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息
3.2.WorkQueue
Work queues,也被称为(Task queues),任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,速度就能大大提高了。
3.2.1.消息发送
这次我们循环发送,模拟大量消息堆积现象。
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Test public void testWorkQueue() throws InterruptedException { String queueName = "simple.queue"; String message = "hello, message_"; for (int i = 0; i < 50; i++) { rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
|
3.2.2.消息接收
要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:
1 2 3 4 5 6 7 8 9 10 11
| @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); }
@RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
|
注意到这个消费者sleep了1000秒,模拟任务耗时。
3.2.3.测试
启动ConsumerApplication后,在执行publisher服务中刚刚编写的发送测试方法testWorkQueue。
可以看到消费者1很快完成了自己的25条消息。消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。这样显然是有问题的。
3.2.4.能者多劳
在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:
1 2 3 4 5
| spring: rabbitmq: listener: simple: prefetch: 1
|
3.2.5.总结
Work模型的使用:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量
3.3.发布/订阅
发布订阅的模型如图:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.4.Fanout
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 4) 交换机把消息发送给绑定过的所有队列
- 5) 订阅队列的消费者都能拿到消息
我们的计划是这样的:
- 创建一个交换机 itcast.fanout,类型是Fanout
- 创建两个队列fanout.queue1和fanout.queue2,绑定到交换机itcast.fanout

3.4.1.声明队列和交换机
Spring提供了一个接口Exchange,来表示所有不同类型的交换机:

在consumer中创建一个类,声明队列和交换机:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutConfig {
@Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); }
@Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); }
@Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); }
@Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); }
@Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
|
3.4.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
1 2 3 4 5 6 7 8
| @Test public void testFanoutExchange() { String exchangeName = "itcast.fanout"; String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
|
3.4.3.消息接收
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
1 2 3 4 5 6 7 8 9
| @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); }
@RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
|
3.4.4.总结
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange
- Binding
3.5.Direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的 Routing key
完全一致,才会接收到消息
案例需求如下:
利用@RabbitListener声明Exchange、Queue、RoutingKey
在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
在publisher中编写测试方法,向itcast. direct发送消息

3.5.1.基于注解声明队列和交换机
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】"); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue2"), exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT), key = {"red", "yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】"); }
|
3.5.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
1 2 3 4 5 6 7 8 9
| @Test public void testSendDirectExchange() { String exchangeName = "itcast.direct"; String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; rabbitTemplate.convertAndSend(exchangeName, "red", message); }
|
3.5.3.总结
描述下Direct交换机与Fanout交换机的差异?
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
3.6.Topic
3.6.1.说明
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者 item.spu
item.*
:只能匹配item.spu
图示:

解释:
- Queue1:绑定的是
china.#
,因此凡是以 china.
开头的routing key
都会被匹配到。包括china.news和china.weather
- Queue2:绑定的是
#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配。包括china.news和japan.news
案例需求:
实现思路如下:
并利用@RabbitListener声明Exchange、Queue、RoutingKey
在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
在publisher中编写测试方法,向itcast. topic发送消息

3.6.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法:
1 2 3 4 5 6 7 8 9 10 11 12
|
@Test public void testSendTopicExchange() { String exchangeName = "itcast.topic"; String message = "喜报!孙悟空大战哥斯拉,胜!"; rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
|
3.6.3.消息接收
在consumer服务的SpringRabbitListener中添加方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue1"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "china.#" )) public void listenTopicQueue1(String msg){ System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】"); }
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic.queue2"), exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC), key = "#.news" )) public void listenTopicQueue2(String msg){ System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】"); }
|
3.6.4.总结
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey必须是多个单词,以
**.**
分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
#
:代表0个或多个词
*
:代表1个词
3.7.消息转换器
之前说过,Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
我们来测试一下。
3.7.1.测试默认转换器
我们修改消息发送的代码,发送一个Map对象:
1 2 3 4 5 6 7 8 9
| @Test public void testSendMap() throws InterruptedException { Map<String,Object> msg = new HashMap<>(); msg.put("name", "Jack"); msg.put("age", 21); rabbitTemplate.convertAndSend("simple.queue","", msg); }
|
停止consumer服务
发送消息后查看控制台:

3.7.2.配置JSON转换器
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖:
1 2 3 4 5
| <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> <version>2.9.10</version> </dependency>
|
配置消息转换器。
在启动类中添加一个Bean即可:
1 2 3 4
| @Bean public MessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); }
|