商城:RabbitMQ

RabbitMQ:https://www.tinstu.com/category/notes/rabbitmq

docker安装RabbitMQ

docker run -d  --name rabbitmq
-p 5671:5671
-p 5672:5672
-p 4369:4369
-p 25672:25672
-p 15671:15671
-p 15672:15672
rabbitmq:management

4369, 25672 (Erlang发现&集群端口)

5672, 5671 (AMQP端口)

15672 (web管理后台端口)

61613, 61614 (STOMP协议端口)

1883, 8883 (MQTT协议端口)

https://www.rabbitmq.com/networking.html

Exchange 类型

direct、fanout、topic、headers

Direct Exchange

消息中的路由键(routing key)如果和Binding 中的 binding key 一致, 交换器 就将消息发到对应的队列中。路由键与队 列名完全匹配,如果一个队列绑定到交换 机要求路由键为“dog”,则只转发 routingkey 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard” 等等。它是完全匹配、单播的模式。

Fanout Exchange

每个发到 fanout 类型交换器的消息都 会分到所有绑定的队列上去。fanout 交 换器不处理路由键,只是简单的将队列 绑定到交换器上,每个发送到交换器的 消息都会被转发到与该交换器绑定的所 有队列上。很像子网广播,每台子网内 的主机都获得了一份复制的消息,此类型是转发消息最快的!

Topic Exchange

topic 交换器通过模式匹配分配消息的 路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。 它将路由键和绑定键的字符串切分成单 词,这些单词之间用点隔开。它同样也 会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一 个单词。

RabbitMQ的整合

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.配置文件引入

 

spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

3.启动类加注解:@EnableRabbit

AmqpAdmin的使用

使用AmqpAdmin 实现 创建 Exchange  queue  binding

    @Autowired
    AmqpAdmin amqpAdmin;
    @Test
     public void ex() {
        //创建 exchange
        DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
    }
    @Test
    public void qu(){
        //创建queue
        Queue queue = new Queue("hello-java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
    }

 

RabbitTemplate的使用

使用RabbitTempalte 进行 消息的发送

    @Autowired
     RabbitTemplate rabbitTemplate;
    //发送消息
    @Test
    public void  send(){
        String msg = "hello";
        rabbitTemplate.convertAndSend("hello-java-exchang","hello.java",msg);
    }

如果发送的是一个对象,那么接收到的是一个序列化之后的结果,可以通过添加一个配置类,使接收到的消息为json格式

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

RabbitListener&RabbiHander接受消息

 

RabbitListener: 可以标注在类上,也可以标注在方法上

RabbiHander:可以标注在方法上,让此方法只接受某个类型的消息

可靠投递-发送端确认

  • 保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认 机制
  • publisher confirmCallback 确认模式
  • publisher returnCallback 未投递到 queue 退回模式
  • consumer ack机制

可靠抵达-ConfirmCallback

配置文件:spring.rabbitmq.publisher-confirms=true

CorrelationData:用来表示当前消息唯一性。
消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。

可靠抵达-ReturnCallback

配置文件:

# 开启发送端消息抵达队列确认
spring.rabbitmq.publisher-returns=true
# 只要抵达队列,以异步优先回调我们这个 returnconfirm
spring.rabbitmq.template.mandatory=true

confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数 据,定期的巡检或者自动纠错都需要这些数据。

public void initRabbitTemplate(){
		/**
		 * 		设置确认回调  消息到达broker
		 *  correlationData: 消息的唯一id
		 *  ack: 消息是否成功收到
		 * 	cause:失败的原因
		 */
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });

		/**
		 * 只要消息没有投递给指定的队列,就触发这个失败回调
		 *
		 * message: 投递失败的消息详细信息
		 * replyCode: 回复的状态码
		 * replyText: 回复的文本内容
		 * exchange: 当时这个发送给那个交换机
		 * routerKey: 当时这个消息用那个路由键
		 */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });

 可靠抵达-Ack消息确认机制

消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成, 或者成功处理。我们可以开启手动ack模式

# 手动ack消息 消息不容易丢失
spring.redis.listener.simple.acknowledge-mode=manual

long delivertTag = massage.getMessageProperties().getDeliveryTag();
如何签收:
     签收: channel.basicAck(deliveryTag, false);   //业务完成就应该签收
     拒签: channel.basicNack(deliveryTag, false,true); //拒签,业务失败,true表不把消息丢弃,要把消息重新放到队列中去!

 

 

阅读剩余
THE END