RabbitMQ

什么是RabbitMQ?

高性能的异步通信组件

时效性差,并发性能高

同步异步调用

同步调用

同步调用的优势:

时效性强,等待到结果后才返回

问题:

拓展性差

性能下降

级联失败问题

异步调用

异步调用的三个角色

消息发送者:投递消息的人,调用方

消息代理:管理、暂存、转发消息

消息接收者: 接受消息的人,服务提供方

优势:

解除耦合,拓展性强

无需等待,性能好

故障隔离

缓存消息,流量削峰填谷

问题:

不能及时得到调用结果,时效性差

不能确定下游业务是否成功

业务安全完全依赖于消息代理(Broker)的可靠性

RabbitMQ开始

RabbitMQ的安装

Docker镜像拉取

docker pull rabbitmq

部署

docker run -d –name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq

创建用户

rabbitmqctl add_user admin your_password

如果拉取镜像部署完,并且端口已开放,还是无法访问,这说明RabbitMQ管理插件未启用,进入容器后,输入rabbitmq-plugins enable rabbitmq_management 即可

核心概念

publisher:消息发送者

consumer:消息的消费者

queue:队列,存储消息

exchange:交换机,负责路由、转发消息,没有存储消息的能力

virtual-host:虚拟主机,起到数据隔离的作用

快速入门

在RabbitMQ控制台完成下列操作:

新建队列hello.queue1和queue2

向默认的amp.fanout交换机发送一条消息

查看消息是否到达hello.queue1和hello.queue2

新建两个队列

交换机绑定两个队列,并向其发送消息

队列接受到消息

数据隔离

在rabbitmq控制台新建一个用户,给他admin权限,并为其创建一个虚拟主机,他只具有查看其它消息队列及其其它信息的权限,不能对别的用户的数据进行干涉,不同的虚拟主机之间出现数据隔离的现象。

JAVA客户端的使用

AMQP和Spring AMQP

AMQP:

Advanced Message Queuing Protocol,用于应用程序之间传递业务消息的开放标准协议。语言无关性-符合微服务中的独立性要求。

Spring AMQP:

基于AMQP的一套API规范,提供发送和接受消息的模板。

其中,Spring-AMQP是基础抽象,Spring-Rabbit是底层的默认实现。

快速入门

SpringAMQP收发消息

1.引入spring-boot-starter-amqp依赖

2.配置RabbitMQ服务端消息

3.利用RabbitTemplate发送消息

4.利用@RabbitListener注解声明要监听的队列,监听消息

用一个项目简单描述rabbitMQ消息的收发过程,再rabbitMQ的网页端注册一个hmall的用户,新建一个simple.queue的消息队列,用于模拟publisher消息发送端发送消息到simple.queue,由消息队列转发消息到consumer,consumer监听pulisher所发送的消息。

1.引入依赖

这里依赖是父项目中的,publisher与comsumer是两个子项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>

2.配置RabbitMQ服务端

publisher发送消息端的yml配置

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /hmall
username: hmall
password: 123

3.利用RabbitTemplate发送消息

写一个publisher端的测试类,用于模拟发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
    @SpringBootTest
class PublisherApplicationTest {
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void testSendMessage2Queue(){
String queueName = "simple.queue";
String msg = "hello,rabbit!";
rabbitTemplate.convertAndSend(queueName,msg);
}

}

4.配置consumer端的配置文件

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码

5.消息接收

利用RabbitListener来声明要监听的队列信息
将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。 可以看到方法体中接收的就是消息体的内容

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = "simple.queue")
public void mlsq(String msg){
System.out.println("收到的消息为:"+ msg);
}
}

6.测试

在启动consumer之前执行publisher的测试方法观察rabbitMQ网页端可看见未转发前的队列消息,但是当启动consumer类后再次执行测试方法则无法观察到网页端的消息,因为此时消息已经被消息队列转发到conumer端

启动consumer启动类,执行publisher的测试方法,验证可行性

成功consumer监听到了来自simple.queue的转发的消息,并执行了监听的方法

work queue

work模型(解决消息堆积问题)

多个消费者绑定到一个队列,可以加快消息处理速度

同一条消息只会被一个消费者处理

通过设置prefetch来控制消费者预取的消息数量,处理完一条消息再处理下一条,实现能者多劳

案例

在rabbitMQ控制台创建一个队列,名为work.queue

在publisher服务中定义测试方法,在1s内产生50条消息,发送到work.queue

在consumer服务中定义两个消息监听者,都监听work.queue队列

消费者1每秒处理50条消息,消费者2每秒处理5条消息

消息发送者发送50条消息

1
2
3
4
5
6
7
8
9
@Test
void testWorkQueue() throws InterruptedException {
String queueName = "work.queue";
for (int i = 0; i < 50; i++) {
String msg = "hello,workQueue!,msg:" + i;
rabbitTemplate.convertAndSend(queueName,msg);
Thread.sleep(20);
}
}

接收端接受消息

1
2
3
4
5
6
7
8
@RabbitListener(queues = "work.queue")
public void lwq1(String msg) throws InterruptedException {
System.out.println("c1收到的消息为:"+ msg);
}
@RabbitListener(queues = "work.queue")
public void lwq2(String msg) throws InterruptedException {
System.err.println("c2收到的消息为....:"+ msg);
}

接受消息的方式按照轮询的方式分配给消费者

当消费者性能不一致时

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "work.queue")
public void lwq1(String msg) throws InterruptedException {
System.out.println("c1收到的消息为:"+ msg);
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void lwq2(String msg) throws InterruptedException {
System.err.println("c2收到的消息为....:"+ msg);
Thread.sleep(200);
}

还是按照轮询的方式进行分配,并且出现了消息堆积的现象,性能高的消费者优先完成,性能差者消息堆积

解决办法

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /hmall
username: hmall
password: 123
listener:
simple:
prefetch: 1

加入prefetch,每次必须完成一条消息后才可以进行下次消息处理,性能更强的消费者分配的消息更多,能者多劳

Fanout交换机

fanout交换机

真实的生产环境都会经过exchange来发送消息,额不是直接发送到队列

交换机的类型有以下三种

Fanout:广播

Direct:定向

Topic:话题

作用:

接受publisher发送的消息

将消息按照规则路由到与之绑定的队列

FanoutExchange会将消息路由到每个绑定的队列

Fanout Exchange会将接受到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

案例

consumer服务中的两个消费者

1
2
3
4
5
6
7
8
@RabbitListener(queues = "fanout.queue1")
public void lft1(String msg) throws InterruptedException {
System.out.println("c1收到的f1消息为:"+ msg);
}
@RabbitListener(queues = "fanout.queue2")
public void lft2(String msg) throws InterruptedException {
System.err.println("c2收到的f2消息为....:"+ msg);
}

向交换机发送消息并广播

1
2
3
4
5
6
7
@Test
void testSendFanout(){
String exchange = "hmall.fanout";
String msg = "hello,fanout!";
rabbitTemplate.convertAndSend(exchange,null,msg);
}

与交换机绑定的消息队列,都收到了由fanout交换机广播到的消息并转发给对应的消费者

Direct交换机

direct交换机

每一个消息队列都与交换机设置一个BindingKey

发布者发布消息时,指定消息的Routingkey

交换机将消息路由到BindingKey与消息RoutingKey一致的队列

案例

新建一个交换机hmall.direct

新建两个队列 direct.queue1 direct.queue2

direct.queue1与交换机的bindingKey为 red 和 blue

direct.queue2与交换机的bindingKey为 red 和 yellow

1
2
3
4
5
6
7
8
9
//direct
@RabbitListener(queues = "direct.queue1")
public void ldq1(String msg) throws InterruptedException {
System.out.println("c1收到的d1消息为:"+ msg);
}
@RabbitListener(queues = "direct.queue2")
public void ldq2(String msg) throws InterruptedException {
System.err.println("c2收到的d2消息为....:"+ msg);
}

测试1

向交换机发送消息,指定消息的routingKey为red

1
2
3
4
5
6
@Test
void testSendDirect1(){
String exchange = "hmall.direct";
String msg = "red";
rabbitTemplate.convertAndSend(exchange,"red",msg);
}

因为direct1.queue1 和 direct.queue2都绑定了red,所以都能收到消息

测试2

1
2
3
4
5
6
@Test
void testSendDirect2(){
String exchange = "hmall.direct";
String msg = "yellow";
rabbitTemplate.convertAndSend(exchange,"yellow",msg);
}

因为只有direct.queue2绑定了yellow,所以只有消费者2接收到了消息

Topic交换机

Topic交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,可以用.分割

Queue与Exchange指定BindingKey时可以使用通配符

#: 表示0个或多个单词

*: 表示一个单词

案例

1
2
3
4
5
6
7
8
9
10
//china.#
@RabbitListener(queues = "topic.queue1")
public void ltq1(String msg) throws InterruptedException {
System.out.println("c1收到的t1消息为:"+ msg);
}
//#.news
@RabbitListener(queues = "topic.queue2")
public void ltq2(String msg) throws InterruptedException {
System.err.println("c2收到的t2消息为....:"+ msg);
}

测试1

1
2
3
4
5
6
@Test
void testSendTopic1(){
String exchange = "hmall.topic";
String msg = "china news";
rabbitTemplate.convertAndSend(exchange,"china.news",msg);
}

因为topic.queue1绑定的为china.#,topic.queue2绑定的为*.news两者都满足

测试2

1
2
3
4
5
6
@Test
void testSendTopic2(){
String exchange = "hmall.topic";
String msg = "china weather";
rabbitTemplate.convertAndSend(exchange,"china.weather",msg);
}

c1的bindingKey是china.#,故c1收到消息

测试3

1
2
3
4
5
6
@Test
void testSendTopic3(){
String exchange = "hmall.topic";
String msg = "word news";
rabbitTemplate.convertAndSend(exchange,"word.news",msg);
}

声明队列交换机

声明队列交换机

Queue: 用于声明队列,可以用工厂类QueueBuilder构建

Exchange: 用于声明交换机,可以用工厂类ExchangeBuilder构建

Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

基于Bean的声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Bean
public FanoutExchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
// return new FanoutExchange("hmall.fanout2");
}

@Bean
public Queue fanoutQueue3(){
// return new Queue("fanout.queue3");
return QueueBuilder.durable("fanout.queue3").build();
}
@Bean
public Queue fanoutQueue4(){
// return new Queue("fanout.queue3");
return QueueBuilder.durable("fanout.queue4").build();
}

@Bean
public Binding fanoutBinding3(Queue fanoutQueue3,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}

@Bean
public Binding fanoutBinding4(){
//fanoutQueue4(),首先从spring容器中查找是否有相应的bean,有则从容器中直接拿
//没有则创建bean
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
}

当使用的是Direct交换机时,一个交换机绑定多个bindingKey时,将会使得代码繁多,冗余

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 	@Bean
public DirectExchange directExchange(){
return ExchangeBuilder.fanoutExchange("hmall.direct").build();
// return new FanoutExchange("hmall.fanout2");
}

@Bean
public Queue directQueue1(){
// return new Queue("fanout.queue3");
return QueueBuilder.durable("fanout.queue3").build();
}

@Bean
public Binding directBindingRed(Queue directQueue1,DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

@Bean
public Binding directBindingBlue(Queue directQueue1,DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}

基于注解的方式声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
>//red blue
@RabbitListener(bindings = @QueueBinding(
//durable 持久化
value = @Queue(name = "direct.queue1",durable = "true"),
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
>))
>public void ldq1(String msg) throws InterruptedException {
System.out.println("c1收到的d1消息为:"+ msg);
>}

>//red yellow
@RabbitListener(bindings = @QueueBinding(
//durable 持久化
value = @Queue(name = "direct.queue2",durable = "true"),
exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
key = {"red","yellow"}
>))
>public void ldq2(String msg) throws InterruptedException {
System.err.println("c2收到的d2消息为....:"+ msg);
}

消息转换器

默认消息转换器的问题

利用SpringAMQP发送一条消息到RabbitMQ的客户端,观察在Object.queue中接收到的消息反映了什么问题?

1
2
3
4
5
6
7
8
9
@Test
//消息转换器
void testSendObject(){
Map<String,Object> msg = new HashMap<>(2);
msg.put("name","Jack");
msg.put("age",21);
rabbitTemplate.convertAndSend("object.queue",msg);
}

接受到的消息体积大,代码可读性低,安全性不高(反序列化执行非法代码)

在对Map类型的消息进行转换时,底层使其进行了(Serialization.serilize) ObjectOutputStream的字节流转换(JDK的序列化方式)

改变消息转换器

采用json序列化

1.引入依赖

在publisher和consumer中都引入jackson依赖,这里在父工程中引用

1
2
3
4
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>

2.在启动类中声明jackson的bean

1
2
3
4
>@Bean
>public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
>}

3.测试

1
2
3
4
5
6
7
8
@Test
//消息转换器
void testSendObject(){
Map<String,Object> msg = new HashMap<>(2);
msg.put("name","Jack");
msg.put("age",21);
rabbitTemplate.convertAndSend("object.queue",msg);
}

比之前接受的消息所占体积更小,可读性更高

4.收到的消息

1
2
3
4
5
//jackson消息转换器
@RabbitListener(queues = "object.queue")
public void lo(Map<String,Object> msg) throws InterruptedException {
System.out.println("c2收到的o2消息为....:"+ msg);
}

消息可靠性问题

生产者重连

一般用于网络波动时,服务掉线的情况,设置重连机制保证网络波动时RabbitMQ的可靠性

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1 #MQ的连接超时时间
template:
retry:
enabled: true #开启超时重连机制
initial-interval: 1000ms #失败后的初始等待时间
multiplier: 1 #失败后下次等待时长倍数
max-attempts: 3 #最大重试次数

缺点:

这种方法是阻塞式,每次重试等待的过程中,线程是阻塞的,会影响业务性能

如果对于业务性能有要求,建议禁止使用重试机制,如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。