ActiveMQ:Java实现ActiveMQ通讯入门案例

 

以队列消息(queue)作为生产者的入门案例

pom.xml导入依赖

    <dependencies>
        <!--  activemq  所需要的jar 包-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
        <!--  activemq 和 spring 整合的基础包 -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
    </dependencies>

消息的生产者

package com.tinstu.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsProduct {
    public static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("userName","passWord",ACTIVEMQ_URL);
        //2. 通过链接工厂,获取链接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //3. 创建回话session
        //两个参数,第一个叫事务/第二个叫签收
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        //6.通过使用messageProducer生产3条消息发送到MQ队列里面
        for(int i = 1;i<=3;i++){
            //7.创建队列消息
            TextMessage textMessage = session.createTextMessage("msg-----"+i);
            //8.通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }
        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("*****消息发送到MQ完成!");

    }
}

运行后进入 activeMQ 后台查看:

消息消费者:

写法1:

同步阻塞方式(receive())

订阅者或接收者调用MessageConsumer的receive( )方法来接收消息,receive方法在能够接收消息到消息之前(或超时之前)将一直阻塞!

package com.tinstu.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", ACTIVEMQ_URL);
        //2. 通过链接工厂,获取链接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //3. 创建回话session
        //两个参数,第一个叫事务/第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
         //5.创消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while (true){
          //receive(); 表示一直在这里等    receive(4000L);等4秒
          TextMessage textMessage = (TextMessage) messageConsumer.receive();
          if(null != textMessage){
              System.out.println("****消费者接收信息:"+textMessage.getText());
          }else {
              break;
          }
        }
         messageConsumer.close();
        session.close();
        connection.close();
    }
}

运行结果截图:

 

查看activeMQ控制台的变化

写法2:  通过监听的方式来监听消息

异步非阻塞方式(监听器 onMassage( ))

订阅者或接收者通过MessageConsumer的setMessageListener(MessageLister listener)注册一个监听器

当消息到达之后,系统自动调用监听器Message的 onMessage(Message message)方法

package com.tinstu.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", ACTIVEMQ_URL);
        //2. 通过链接工厂,获取链接connection并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        //3. 创建回话session
        //两个参数,第一个叫事务/第二个叫签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(具体是队列还是主题topic)
        Queue queue = session.createQueue(QUEUE_NAME);
         //5.创消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("****接收者接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

 消费者的三大消费情况

1.先生产  : 只启动一号消费者,一号消费者能消费信息

2.先生产:先启动一号消费者,在启动二号消费者,一号可以消费信息,二号不能消费信息

3.先启动2个消费者:再生产6条消息,一人一半!

以主题(topic)作为生产者案例

topic介绍

在发布订阅消息传递域中,目的地被称为主题(topic)

发布/订阅消息传递域的特点如下:

(1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;

(2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息

(3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。

一句话,好比我们的微信公众号订阅

生产者创建目的地时使用 topic

//4.创建目的地(具体是队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);

消费者创建目的地时使用topic

// 4 创建目的地 (两种 : 队列/主题   这里用主题)
 Topic topic = session.createTopic(TOPIC_NAME);

先启动三个消费者,后启动消费者生产3条消息

结果:

三个消费者都能接收到消息!

(如果先生产,后启动消费者的话,消费者无法接收数据)

queue与topic的区别

阅读剩余
THE END