ActiveMQ:JMS规范-消息的事务性与签收机制

    持久性,事务,签收,共同保证了jsm的可靠性

    事务性

    消息的生产者事务介绍

    生产者开启事务后,执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。执行rollback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。

    package com.tinstu.activemq.tx;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class JmsProductTX {
        private static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
        private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //1.创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            //设置为开启事务
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
            MessageProducer producer = session.createProducer(queue);
            try {
                for (int i = 0; i < 3; i++) {
                    TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                    producer.send(textMessage);
                  /*
                     //模拟出现异常,消息进行回滚,之前的消息都会回滚掉
                     if(i == 2){
                        throw new RuntimeException("GG.....");
                    }
                  */
                }
                // 2. 开启事务后,使用commit提交事务,这样这批消息才能真正的被提交。
                session.commit();
                System.out.println("消息发送完成");
            } catch (Exception e) {
                System.out.println("出现异常,消息回滚");
                // 3. 工作中一般,当代码出错,我们在catch代码块中回滚。这样这批发送的消息就能回滚。
                session.rollback();
            } finally {
                //4. 关闭资源
                producer.close();
                session.close();
                connection.close();
            }
        }
    }
    

     消息消费者的事务介绍

    消费者开启事务后,执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理

    package com.tinstu.activemq.tx;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    public class JmsConsumerTX {
        private static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
        private static final String ACTIVEMQ_QUEUE_NAME = "Queue-TX";
    
        public static void main(String[] args) throws JMSException, IOException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            // 创建会话session,两个参数transacted=事务,acknowledgeMode=确认模式(签收)
            // 消费者开启了事务就必须手动提交,不然会重复消费消息
            final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
            MessageConsumer messageConsumer = session.createConsumer(queue);
            while (true){
                TextMessage textMessage = (TextMessage) messageConsumer.receive();
                if(null != textMessage){
                    System.out.println("****消费者接收信息:"+textMessage.getText());
                }else {
                    break;
                }
            }
            //关闭资源
            System.in.read();
            //如果不写的话,控制台显示消息,但是实际上消息未被消费,仍可再次被消费
            session.commit();
          /*  //控制台显示消息,但是信息未被消费
            session.rollback(); */
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }
    

    签收机制

    消费者非事务模式下消费者签收

    默认是自动签收,签收消息的意思就是让某个消息是已消费的状态,不签收就会使消息反复被消费!

    (当生产者开启事务,签收机制不再重要。)

    四种签收方式:

    • 自动签收(AUTO_ACKNOWLEDGE):该方式是默认的。该种方式,无需我们程序做任何操作,框架会帮我们自动签收收到的消息。
    • 手动签收(CLIENT_ACKNOWLEDGE):手动签收。该种方式,需要我们手动调用Message.acknowledge(),来签收消息。如果不签收消息,该消息会被我们反复消费,只到被签收。
    • 允许重复消息(DUPS_OK_ACKNOWLEDGE):多线程或多个消费者同时消费到一个消息,因为线程不安全,可能会重复消费。该种方式很少使用到。
    • 事务下的签收(SESSION_TRANSACTED):开始事务的情况下,可以使用该方式。该种方式很少使用到。

    如果消费者的签收方式改为手动签收 Session.CLIENT_ACKNOWLEDGE

    那么就需要对消息进行手动签收

                    System.out.println("****消费者接收信息:"+textMessage.getText());
                    textMessage.acknowledge();  //签收消息

    如果不进行手动签收的话,消息就不会被消费!造成重复消费

    消息有事务模式下的消费者签收

    签收和事务的关系

    • 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。事务优先于签收,开始事务后,签收机制不再起任何作用。
    • 非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
    • 生产者事务开启,只有commit后才能将全部消息变为已消费。
    • 事务偏向生产者,签收偏向消费者。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。

    总结

    jsm点对点总结

    点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。

    如果在Session关闭时有部分消息己被收到但还没有被签收(acknowledged),那当消费者下次连接到相同的队列时,这些消息还会被再次接收

    队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

    jsm发布订阅总结

    • JMS的发布订阅总结

    JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic。

    主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

    主题使得消息订阅者和消息发布者保持互相独立不需要解除即可保证消息的传送

    • 非持久订阅

    非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收发到某个主题的消息。

    如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。

    一句话:先订阅注册才能接受到发布,只给订阅者发布消息。

    • 持久订阅

    客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ的时候,会根据消费者的ID得到所有当自己处于离线时发送到主题的消息

    当持久订阅状态下,不能恢复或重新派送一个未签收的消息。

    持久订阅才能恢复或重新派送一个未签收的消息。

    • 非持久和持久化订阅如何选择

    当所有的消息必须被接收,则用持久化订阅。当消息丢失能够被容忍,则用非持久订阅。

    阅读剩余
    THE END