初识MQ

同步调用的问题


同步调用的优点:
:::color1
•时效性较强,可以立即得到结果
:::
同步调用的问题:
:::danger
•耦合度高
•性能和吞吐能力下降
•有额外的资源消耗
•有级联失败问题
:::
异步调用





异步通信的优点:
:::danger
•耦合度低
•吞吐量提升
•故障隔离
•流量削峰
:::
异步通信的缺点:
:::success
•依赖于Broker的可靠性、安全性、吞吐能力
•架构复杂了,业务没有明显的流程线,不好追踪管理
:::
不同MQ对比

追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
RabbitMQ快速入门
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
安装RabbitMQ
我们在Centos7虚拟机中使用Docker来安装。
这里选用在线拉取镜像的方式
1
|
docker pull rabbitmq:3-management
|
执行下面的命令来运行MQ容器:
1
2
3
4
5
6
7
8
9
|
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
|
安装完成后即可登录:

RabbitMQ的结构和概念

RabbitMQ中的几个概念:
:::success
- channel:通道,操作MQ的工具
- exchange:交换机,路由消息到队列中
- queue:队列,存放消息
- virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组,可用它支持多租户模式,可以理解成namespace
:::
常见消息模型
MQ的官方文档RabbitMQ Tutorials | RabbitMQ中给出了5个MQ的Demo示例,对应了几种不同的用法:

简单队列模型(BasicQueue)
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
:::color1
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
:::

案例:

PublisherTest:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.79.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
|
信息发送完成后:

ConsumerTest:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package cn.itcast.mq.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.79.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
|
控制台:


基本消息队列的消息发送流程:
:::color2
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
:::
基本消息队列的消息接收流程:
:::color2
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
:::
SpringAMQP实现简单队列
什么是SpringAMQP
SpringAmqp的官方地址:Spring AMQP

利用SpringAMQP实现HelloWorld中的基础消息队列功能:
流程如下:
1.在父工程中引入spring-amqp的依赖
2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列
步骤1:引入AMQP依赖
因为publisher和consumer服务都需要amqp依赖,因此这里把依赖直接放到父工程mq-demo中:
1
2
3
4
5
|
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
|
步骤2:在publisher中编写测试方法,向simple.queue发送消息
1.在publisher服务中编写application.yml,添加mq连接信息:
1
2
3
4
5
6
7
|
spring:
rabbitmq:
host: 192.168.79.128
port: 5672
virtual-host: /
username: itcast
password: 123321
|
2.在publisher服务中新建一个测试类,编写测试方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend("simple.queue", message);
System.out.println("发送消息成功:【" + message + "】");
}
}
|
步骤3:在consumer中编写消费逻辑,监听simple.queue
1.在consumer服务中编写application.yml,添加mq连接信息:
1
2
3
4
5
6
7
|
spring:
rabbitmq:
host: 192.168.79.128
port: 5672
virtual-host: /
username: itcast
password: 123321
|
2.在consumer服务中新建一个类,编写消费逻辑:
1
2
3
4
5
6
7
|
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listen(String message) {
System.out.println("接收到simple.queue消息:" + message);
}
}
|
Work Queue 工作队列


步骤1:生产者循环发送消息到simple.queue
在publisher服务中添加一个测试方法,循环发送50条消息到simple.queue队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Resource
private RabbitAdmin rabbitAdmin;
/**
* 测试发送消息到工作队列
*/
@Test
public void testSendMessageToWorkQueue() {
// 创建队列
Queue queue = new Queue("work.queue", true, false, false);
rabbitAdmin.declareQueue(queue);
for (int i = 1; i <= 50; i++) {
String message = "hello, spring amqp!————" + i;
rabbitTemplate.convertAndSend("work.queue", message);
System.out.println("发送消息成功:【" + message + "】");
}
}
|
步骤2:编写两个消费者,都监听simple.queue
在consumer服务中添加一个消费者,也监听simple.queue:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
/**
* 监听工作队列
*
* @param message 消息
*/
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
System.out.println("消费者1接收到work.queue消息:" + message + " time:" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
System.err.println("消费者2。。。接收到work.queue消息:" + message + " time:" + LocalTime.now());
Thread.sleep(200);
}
|
消息大概消费了五秒钟:


这是因为有消费预取的原因
消费预取限制
修改application.yml文件,设置preFetch这个值(默认是无限制),可以控制预取消息的上限:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 192.168.79.128
port: 5672
virtual-host: /
username: itcast
password: 123321
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
|
发布订阅模式
发布( Publish )、订阅( Subscribe )
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
常见exchange类型包括:
:::color3
Fanout:广播
Direct:路由
Topic:话题
:::

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失
发布订阅-Fanout Exchange

利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
- 在consumer服务中,利用代码声明队列、交换机,并将两者绑定
- 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
- 在publisher中编写测试方法,向itcast.fanout发送消息
步骤1:在consumer服务声明Exchange、Queue、Binding
SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:

在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Configuration
public class FanoutConfig {
// 声明交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcast.fanout");
}
// 声明队列1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
// 声明队列2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
// 将队列1绑定到交换机
@Bean
public Binding bindingQueue1ToExchange(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// 将队列2绑定到交换机
@Bean
public Binding bindingQueue2ToExchange(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
|
步骤2:在consumer服务声明两个消费者
在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
/**
* 监听发布/订阅模式 广播exchange
*
* @param message 消息
*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
System.out.println("消费者1接收到fanout.queue1消息:" + message);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
System.out.println("消费者2接收到fanout.queue2消息:" + message);
}
|
步骤3:在publisher服务发送消息到FanoutExchange
在publisher服务的SpringAmqpTest类中添加测试方法:
1
2
3
4
5
6
7
8
9
10
11
12
|
/**
* 测试发送消息到发布/订阅模式 广播exchange
*/
@Test
public void testSendMessageToFanoutExchange() {
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, fanout message!";
// 发送消息,参数分别是:交互机名称、RoutingKey(暂时为空)、消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
|
效果:

交换机的作用是什么?
:::success
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
:::
声明队列、交换机、绑定关系的Bean是什么?
:::color2
- Queue
- FanoutExchange
- Binding
:::
发布订阅-DirectExchange
DirectExchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
:::success
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
:::

利用SpringAMQP演示DirectExchange的使用
实现思路如下:
- 利用@RabbitListener声明Exchange、Queue、RoutingKey
- 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
- 在publisher中编写测试方法,向itcast. direct发送消息

步骤1:在consumer服务声明Exchange、Queue
1.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2,
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/**
* 监听发布/订阅模式 路由direct exchange
*
* @param message 消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "direct.queue1"),
exchange = @Exchange(value = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {
System.out.println("消费者1接收到direct.queue1消息:" + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "direct.queue2"),
exchange = @Exchange(value = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String message) {
System.out.println("消费者2接收到direct.queue2消息:" + message);
}
|
步骤2:在publisher服务发送消息到DirectExchange
在publisher服务的SpringAmqpTest类中添加测试方法:
1
2
3
4
5
6
7
8
9
10
11
|
/**
* 测试发送消息到发布/订阅模式 路由exchange
*/
@Test
public void testSendMessageToDirectExchange() {
// 交换机名称
String exchangeName = "direct.exchange";
rabbitTemplate.convertAndSend(exchangeName, "blue", "hello, blue direct message!");
rabbitTemplate.convertAndSend(exchangeName, "yellow", "hello, green direct message!");
rabbitTemplate.convertAndSend(exchangeName, "red", "hello, yellow direct message!");
}
|
结果:

描述下Direct交换机与Fanout交换机的差异?
:::success
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
:::
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
:::success
@Queue
@Exchange
:::
发布订阅-TopicExchange
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

利用SpringAMQP演示TopicExchange的使用
实现思路如下:
1.并利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3.在publisher中编写测试方法,向itcast. topic发送消息

步骤1:在consumer服务声明Exchange、Queue
1.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,
2.并利用@RabbitListener声明Exchange、Queue、RoutingKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/**
* 监听发布/订阅模式 主题topic exchange
*
* @param message 消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.queue1"),
exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),
key = {"china.#"}
))
public void listenTopicQueue1(String message) {
System.out.println("消费者1接收到topic.queue1消息 china的所有消息:" + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.queue2"),
exchange = @Exchange(value = "topic.exchange", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String message) {
System.out.println("消费者2接收到topic.queue2消息 所有news的消息:" + message);
}
|
步骤2:在publisher服务发送消息到TopicExchange
在publisher服务的SpringAmqpTest类中添加测试方法:
1
2
3
4
5
6
7
8
9
10
11
|
/**
* 测试发送消息到发布/订阅模式 主题exchange
*/
@Test
public void testSendMessageToTopicExchange() {
// 交换机名称
String exchangeName = "topic.exchange";
rabbitTemplate.convertAndSend(exchangeName, "china.news", "china.news topic message!");
rabbitTemplate.convertAndSend(exchangeName, "china.weather", "china.weather topic message!");
rabbitTemplate.convertAndSend(exchangeName, "us.news", "us.news topic message!");
}
|
结果:

描述下Direct交换机与Topic交换机的差异?
:::success
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
:::
SpringAMQP-消息转换器
SpringAMQP中消息的序列化和反序列化是怎么实现的?
:::danger
利用MessageConverter实现的,默认是JDK的序列化
注意发送方与接收方必须使用相同的MessageConverter
:::
在父pom中引入依赖:
1
2
3
4
|
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
|
在publisher和consumer的启动类中增加配置:
1
2
3
4
|
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
|