商城:RabbitMQ
RabbitMQ:https://www.tinstu.com/category/notes/rabbitmq
docker安装RabbitMQ
docker run -d --name rabbitmq
-p 5671:5671
-p 5672:5672
-p 4369:4369
-p 25672:25672
-p 15671:15671
-p 15672:15672
rabbitmq:management
4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口)
https://www.rabbitmq.com/networking.html
Exchange 类型
direct、fanout、topic、headers
Direct Exchange
消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器 就将消息发到对应的队列中。路由键与队 列名完全匹配,如果一个队列绑定到交换 机要求路由键为“dog”,则只转发 routingkey 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。
Fanout Exchange
每个发到 fanout 类型交换器的消息都 会分到所有绑定的队列上去。fanout 交 换器不处理路由键,只是简单的将队列 绑定到交换器上,每个发送到交换器的 消息都会被转发到与该交换器绑定的所 有队列上。很像子网广播,每台子网内 的主机都获得了一份复制的消息,此类型是转发消息最快的!
Topic Exchange
topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。它同样也 会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一 个单词。
RabbitMQ的整合
1.引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置文件引入
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
3.启动类加注解:@EnableRabbit
AmqpAdmin的使用
使用AmqpAdmin 实现 创建 Exchange queue binding
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void ex() {
//创建 exchange
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
}
@Test
public void qu(){
//创建queue
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
}
RabbitTemplate的使用
使用RabbitTempalte 进行 消息的发送
@Autowired
RabbitTemplate rabbitTemplate;
//发送消息
@Test
public void send(){
String msg = "hello";
rabbitTemplate.convertAndSend("hello-java-exchang","hello.java",msg);
}
如果发送的是一个对象,那么接收到的是一个序列化之后的结果,可以通过添加一个配置类,使接收到的消息为json格式
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
RabbitListener&RabbiHander接受消息
RabbitListener: 可以标注在类上,也可以标注在方法上
RabbiHander:可以标注在方法上,让此方法只接受某个类型的消息
可靠投递-发送端确认
- 保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认 机制
- publisher confirmCallback 确认模式
- publisher returnCallback 未投递到 queue 退回模式
- consumer ack机制
可靠抵达-ConfirmCallback
配置文件:spring.rabbitmq.publisher-confirms=true
CorrelationData:用来表示当前消息唯一性。
消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。
可靠抵达-ReturnCallback
配置文件:
# 开启发送端消息抵达队列确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步优先回调我们这个 returnconfirm
spring.rabbitmq.template.mandatory=true
confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数 据,定期的巡检或者自动纠错都需要这些数据。
public void initRabbitTemplate(){
/**
* 设置确认回调 消息到达broker
* correlationData: 消息的唯一id
* ack: 消息是否成功收到
* cause:失败的原因
*/
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
*
* message: 投递失败的消息详细信息
* replyCode: 回复的状态码
* replyText: 回复的文本内容
* exchange: 当时这个发送给那个交换机
* routerKey: 当时这个消息用那个路由键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
可靠抵达-Ack消息确认机制
消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成, 或者成功处理。我们可以开启手动ack模式
# 手动ack消息 消息不容易丢失
spring.redis.listener.simple.acknowledge-mode=manual
long delivertTag = massage.getMessageProperties().getDeliveryTag();
如何签收:
签收: channel.basicAck(deliveryTag, false); //业务完成就应该签收
拒签: channel.basicNack(deliveryTag, false,true); //拒签,业务失败,true表不把消息丢弃,要把消息重新放到队列中去!