ActiveMQ:高特性与相关问题
1.引入消息队列之后如何保证其高可用性
zookeeper+Replicated LevelDB
2.异步投递 Asyns Sends
是什么?
- 异步发送可以让生产者发的更快。
- 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。
官网介绍:http://activemq.apache.org/async-sends
开启useAsynSend=true:
代码实现:
如何确认发送成功
异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使得producer.send(msg)特续发送消息由于于消息不阻寒,生产者会认为所有send的消息均被成功发送至MQ。
如果MQ突然宏机,此时生产者端内存中尚未被发送全MQ的消息都会丢失。
所以,正确的异步发送方法是需要接收回调的。
同步发送和异步发送的区别就在此,同步发送等send不阻寒了就表示一定发送成功了,异步发送需要接收回执并由客户端再州断一次是否发送成功。
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://0.0.0.0:61616";
private static final String ACTIVEMQ_QUEUE_NAME = "Async";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true); //开启
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"orderAtguigu"); //给消息加一个属性 id
final String msgId = textMessage.getJMSMessageID();
activeMQMessageProducer.send(textMessage, new AsyncCallback() { //使用asyncCallback回调函数
public void onSuccess() {
System.out.println("成功发送消息Id:"+msgId);
}
public void onException(JMSException e) {
System.out.println("失败发送消息Id:"+msgId);
}
});
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
activeMQMessageProducer.close();
session.close();
connection.close();
}
}
}
投递投递和定时投递
介绍
官方文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html
2.修改配置文件并重启
对生产者进行修改:
package com.tinstu.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
import javax.jms.*;
public class JmsProduct {
public static final String ACTIVEMQ_URL = "failover:(tcp://120.48.39.100:61616,tcp://180.76.106.99:61616,tcp://152.136.194.161:61616)";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",ACTIVEMQ_URL);
//2. 通过链接工厂,获取链接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 创建回话session
//两个参数,第一个叫事务/第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//非持久化:当服务器宕机,消息不存在
// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//持久化:当服务器宕机,消息依然存在 (默认)
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
//!!!!!!!!!添加的内容
long delay = 3*1000;
long period = 4*1000;
int repeat = 5;
//6.通过使用messageProducer生产3条消息发送到MQ队列里面
for(int i = 1;i<=3;i++){
//7.创建队列消息
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
// 调用Message的set*Property()方法,就能设置消息属性。根据value的数据类型的不同,有相应的API。
//!!!!!!!!!添加的内容
//延迟的时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
// 重复投递的时间间隔
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,period);
// 重复投递的次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*****消息发送到MQ完成!");
}
}
消费者代码和之前一样
消费者每隔一段时间收到3个消息体,间隔和次数取决于上面的设置
消息消费的重试机制
是什么?
官网文档:http://activemq.apache.org/redelivery-policy
是什么: 消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列中,之后broker不会再将该消息发送给消费者。
如下面的情况2,消费者开启事务但是没有commit提交,消费消息但是没有告诉broker确认收到该消息,broker会将该消息重复发个消费者,默认重试6次,再失败就把该消息变为有毒队列放入死信队列中.
具体哪些情况会引发消息重发
- Client用了transactions且再session中调用了rollback
- Client用了transactions且再调用commit之前关闭或者没有commit
- Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover
消息重发时间间隔和重发次数
间隔:1
次数:6
每秒发6次
有毒消息Poison ACK
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。
属性说明
验证:
一个开启事务但是没有进行提交的消费者,消费一条消息6次不进行提交,在第7次时,无法接收该消息,因为该消息已经被放入死信队列!
修改默认参数:
整合spring
死信队列
是什么
官网文档: http://activemq.apache.org/redelivery-policy
死信队列:异常消息规避处理的集合,主要处理失败的消息。
死信队列的配置(一般采用默认)
1.sharedDeadLetterStrategy
不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
2.individualDeadLetterStrategy
可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
3.自动删除过期消息
过期消息是值生产者指定的过期时间,超过这个时间的消息。
4.存放非持久消息到死信队列中
1.1 消息不被重复消费,幂等性
如何保证消息不被重复消费呢?幕等性问题你谈谈
幂等性如何解决,根据messageid去查这个消息是否被消费了。