RabbitMQ:Work Queues-持久化
概念
如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
队列实现持久化
要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
public class Product02 {
//队列名称
public static final String task_name="ack_name";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//****************************************************
//队列持久化
boolean durable=true;
channel.queueDeclare(task_name,durable,false,false,null);
//****************************************************
//控制台
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
//设计生产者发送消息为持久化,实际将消息保存到磁盘
channel.basicPublish("",task_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("生产者发出消息"+message);
}
}
}
如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
持久化的队列会显示一个 蓝色的D,这个时候即使重启 rabbitmq 队列也依然存在
消息持久化
要想让消息实现持久化需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。
将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,后面有说!
不公平分发
之前学的是轮训分发。平均顺序分配,但是有的消费者处理的快,有的处理的慢,造成处理快的消费者在队列中的消息被慢者处理时,一直处于空闲状态,而慢者却一直在干活.
避免这种情况可以设置参数channel.basicQos(1); 在消费者中添加!
public class Work02 {
//队列名称
public static final String task_name="ack_name01";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息处理事件较长");
//声明 接收消息
DeliverCallback deliverCallback = (consumerTag, message)->{
//沉睡10s
SleepUtils.sleep(10);
System.out.println("接收到消息"+new String(message.getBody()));
/**
* 1.消息标记 tag
* 2.是否批量应答信道里的消息 false为不批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
//**************************************************
channel.basicQos(1);
//**************************************************
//采用手动应答
boolean autoAck = false;
channel.basicConsume(task_name,autoAck,deliverCallback,cancelCallback);
}
}
意思就是我的任务还没有完成,我没有应答你,你就别给我分配新任务,mq就会吧任务分配给空闲的消费者.
当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。
预取值
channel 上肯定不止只有一个消息另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。
channel.basicQos(1); 中的1,就是prefetch,该值定义通道上允许的未确认消息的最大数量.一旦到最大数量,mq就不再往这个信道上传递信息.. ,除非至少有一个未处理的消息被确认.(并不是说7条消息,2/5分,c1一定消费5,c2一定消费2,一个消息被确认,空出一个信道位置,mq有任务的话,会继续往里塞!)