商城:订单服务-订单与RabbitMQ相关

创建业务交换机-队列

 

监听库存解锁

库存解锁的场景:

(1).下订单成功,库存过期没有支付被系统自动取消,被用户手动取消,都要解锁库存

(2).下单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁

/**
 * 1.如果每一个商品都锁成功,将当前商品锁定了几件的工作单记录发给MQ 后面在根据这个表写解锁方法
 * 2.锁定失败.前面保存的工作单信息就回滚了,发出去的消息。即使要解锁记录,去数据库查不到ID,所有不要解锁
 */

 

库存解锁逻辑

库存可能的情况

  • 1.库存锁定
    • 下订单成功,库存锁定成功,接下来的业务调用失败。导致订单回滚
  • 2.订单失败
    • 锁库存失败,库存回滚了,锁定记录表中也无数据,此情况无需解锁

库存解锁

1.查询数据库关于这个订单的锁定库存信息

  • 如果有-解锁  再看 订单情况.
    • 1.没有这个订单,必须解锁
    • 2.有这个订单
      • 订单状态 已取消,解锁库存,没取消,不能解锁
  • 如果没有-锁库存失败,库存回滚了,锁定记录表中也无数据,此情况无需解锁

(远程调用product-order时,拦截器放行 “order/omsorder/status/**”)

ware创建 两个队列 与 接收解锁消息的监听器

@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {
    @Autowired
    WmsWareSkuService wareSkuService;

    @RabbitHandler
    public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
        System.out.println("收到解锁消息...");
        try {
            wareSkuService.unlockStock(to);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}

 

 

com.tinstu.gulimall.ware.service.impl.WmsWareSkuServiceImpl

定时关单

Order模块创建  两个队列 与 接受过期订单消息的监听器,接收到就讲订单的状态改为4,已关闭  再进行库存解锁

@Service
public class OrderCloseListener {

    @Autowired
    OmsOrderService orderService;

    @RabbitListener(queues = "order.release.order.queue")
    public void listener(OmsOrderEntity entity, Channel channel, Message message) throws IOException {
        System.out.println("收到的过期的订单消息,准备关闭订单"+entity.getOrderSn());
        try{
            orderService.closeOrder(entity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}

 问题

 

现在  order.delay.queue  和  stock.delay.queue  都是设置的20s进入  order.release.order.queue  和  stock.release.stock.queue

如果订单服务卡段,导致订单状态消息一直改不了,库存消息优先到期,查订单状态为新建状态,不是关闭状态,什么都不做就走了

导致卡顿的订单永远也解不了锁

解决方法:

新建一个消息监听器,监听的消息是:在订单修改为关闭时,发给mq的消息

    @RabbitHandler
    public void handleOrderLockedRelease(OrderTo orderTO, Message message, Channel channel) throws IOException {
        System.out.println("订单关闭准备解锁库存...");
        try {
            wareSkuService.unlockStock02(orderTO);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }

如果上面那个库存解锁没有解锁成功,这个还能解锁一次,如果上一个解锁成功了,这个就只查询一次 不进行解锁!

消息丢失,积压,重复等问题

如何保证消息可靠性-消息丢失

消息发送出去,由于网络问题没有抵达服务器

  • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机 制,可记录到数据库,采用定期扫描重发的方式
  • 做好日志记录,每个消息状态是否都被服务器收到都应该记录
  • 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进 行重发

消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚 未持久化完成,宕机。

  • publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

  • 一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck并重 新入队

如何保证消息可靠性-消息重复

消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息 重新由unack变为ready,并发送给其他消费者

消息消费失败,由于重试机制,自动又将消息发送出去

成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送

  • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有 工作单的状态标志
  • 使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理
  • rabbitMQ的每一个消息都有redelivered字段,可以获取是被重新投递过来的,而不是第一次投递过来的

如何保证消息可靠性-消息积压

消费者宕机积压

消费者消费能力不足积压

发送者发送流量太大

  • 上线更多的消费者,进行正常消费
  • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

 

 

 

 

阅读剩余
THE END