ActiveMQ:高特性与相关问题

1.引入消息队列之后如何保证其高可用性

zookeeper+Replicated LevelDB

2.异步投递  Asyns Sends

是什么?

  • 异步发送可以让生产者发的更快。
  • 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。

官网介绍:http://activemq.apache.org/async-sends

开启useAsynSend=true:

代码实现:

如何确认发送成功

异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使得producer.send(msg)特续发送消息由于于消息不阻寒,生产者会认为所有send的消息均被成功发送至MQ。
如果MQ突然宏机,此时生产者端内存中尚未被发送全MQ的消息都会丢失。
所以,正确的异步发送方法是需要接收回调的。
同步发送和异步发送的区别就在此,同步发送等send不阻寒了就表示一定发送成功了,异步发送需要接收回执并由客户端再州断一次是否发送成功。

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;

import javax.jms.*;
import java.util.UUID;

public class Jms_TX_Producer {

    private static final String ACTIVEMQ_URL = "tcp://0.0.0.0:61616";

    private static final String ACTIVEMQ_QUEUE_NAME = "Async";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUseAsyncSend(true);  //开启
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                textMessage.setJMSMessageID(UUID.randomUUID().toString()+"orderAtguigu"); //给消息加一个属性 id
                final String  msgId = textMessage.getJMSMessageID();
                activeMQMessageProducer.send(textMessage, new AsyncCallback() {  //使用asyncCallback回调函数
                    public void onSuccess() {
                        System.out.println("成功发送消息Id:"+msgId);
                    }

                    public void onException(JMSException e) {
                        System.out.println("失败发送消息Id:"+msgId);
                    }
                });
            }
            System.out.println("消息发送完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            activeMQMessageProducer.close();
            session.close();
            connection.close();
        }
    }
}

 投递投递和定时投递

介绍

官方文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html

2.修改配置文件并重启

对生产者进行修改:

package com.tinstu.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;

import javax.jms.*;

public class JmsProduct {
    public static final String ACTIVEMQ_URL = "failover:(tcp://120.48.39.100:61616,tcp://180.76.106.99:61616,tcp://152.136.194.161:61616)";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
        //2. 通过链接工厂,获取链接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //3. 创建回话session
        //两个参数,第一个叫事务/第二个叫签收
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);

        //非持久化:当服务器宕机,消息不存在
       // messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //持久化:当服务器宕机,消息依然存在 (默认)
         messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        //!!!!!!!!!添加的内容
         long delay = 3*1000;
         long period = 4*1000;
         int repeat = 5;
        //6.通过使用messageProducer生产3条消息发送到MQ队列里面
        for(int i = 1;i<=3;i++){
            //7.创建队列消息
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            // 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。
            //!!!!!!!!!添加的内容            
            //延迟的时间
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
            // 重复投递的时间间隔           
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);
            // 重复投递的次数        
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
            messageProducer.send(textMessage);
        }
        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("*****消息发送到MQ完成!");
    }
}

消费者代码和之前一样

消费者每隔一段时间收到3个消息体,间隔和次数取决于上面的设置

消息消费的重试机制

 

是什么?

官网文档:http://activemq.apache.org/redelivery-policy

是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列中,之后broker不会再将该消息发送给消费者。

如下面的情况2,消费者开启事务但是没有commit提交,消费消息但是没有告诉broker确认收到该消息,broker会将该消息重复发个消费者,默认重试6次,再失败就把该消息变为有毒队列放入死信队列中.

具体哪些情况会引发消息重发

  • Client用了transactions且再session中调用了rollback
  • Client用了transactions且再调用commit之前关闭或者没有commit
  • Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

消息重发时间间隔和重发次数

间隔:1

次数:6

每秒发6次

有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。

属性说明

验证:

一个开启事务但是没有进行提交的消费者,消费一条消息6次不进行提交,在第7次时,无法接收该消息,因为该消息已经被放入死信队列!

修改默认参数:

整合spring

死信队列

是什么

官网文档: http://activemq.apache.org/redelivery-policy

死信队列:异常消息规避处理的集合,主要处理失败的消息。

死信队列的配置(一般采用默认)

1.sharedDeadLetterStrategy

不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。

2.individualDeadLetterStrategy

可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。

3.自动删除过期消息

过期消息是值生产者指定的过期时间,超过这个时间的消息。

4.存放非持久消息到死信队列中

1.1 消息不被重复消费,幂等性

如何保证消息不被重复消费呢?幕等性问题你谈谈

幂等性如何解决,根据messageid去查这个消息是否被消费了。

阅读剩余
THE END