ActiveMQ:Java实现ActiveMQ通讯入门案例
以队列消息(queue)作为生产者的入门案例
pom.xml导入依赖
<dependencies>
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
</dependencies>
消息的生产者
package com.tinstu.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduct {
public static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("userName","passWord",ACTIVEMQ_URL);
//2. 通过链接工厂,获取链接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 创建回话session
//两个参数,第一个叫事务/第二个叫签收
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6.通过使用messageProducer生产3条消息发送到MQ队列里面
for(int i = 1;i<=3;i++){
//7.创建队列消息
TextMessage textMessage = session.createTextMessage("msg-----"+i);
//8.通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*****消息发送到MQ完成!");
}
}
运行后进入 activeMQ 后台查看:
消息消费者:
写法1:
同步阻塞方式(receive())
订阅者或接收者调用MessageConsumer的receive( )方法来接收消息,receive方法在能够接收消息到消息之前(或超时之前)将一直阻塞!
package com.tinstu.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", ACTIVEMQ_URL);
//2. 通过链接工厂,获取链接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 创建回话session
//两个参数,第一个叫事务/第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true){
//receive(); 表示一直在这里等 receive(4000L);等4秒
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if(null != textMessage){
System.out.println("****消费者接收信息:"+textMessage.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}
运行结果截图:
查看activeMQ控制台的变化
写法2: 通过监听的方式来监听消息
异步非阻塞方式(监听器 onMassage( ))
订阅者或接收者通过MessageConsumer的setMessageListener(MessageLister listener)注册一个监听器
当消息到达之后,系统自动调用监听器Message的 onMessage(Message message)方法
package com.tinstu.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://120.48.39.100:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
//1.创建链接工程,按照给定的URL地址,采用默认用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", ACTIVEMQ_URL);
//2. 通过链接工厂,获取链接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3. 创建回话session
//两个参数,第一个叫事务/第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("****接收者接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
消费者的三大消费情况
1.先生产 : 只启动一号消费者,一号消费者能消费信息
2.先生产:先启动一号消费者,在启动二号消费者,一号可以消费信息,二号不能消费信息
3.先启动2个消费者:再生产6条消息,一人一半!
以主题(topic)作为生产者案例
topic介绍
在发布订阅消息传递域中,目的地被称为主题(topic)
发布/订阅消息传递域的特点如下:
(1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
(2)生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
(3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
一句话,好比我们的微信公众号订阅
生产者创建目的地时使用 topic
//4.创建目的地(具体是队列还是主题topic)
Topic topic = session.createTopic(TOPIC_NAME);
消费者创建目的地时使用topic
// 4 创建目的地 (两种 : 队列/主题 这里用主题)
Topic topic = session.createTopic(TOPIC_NAME);
先启动三个消费者,后启动消费者生产3条消息
结果:
三个消费者都能接收到消息!
(如果先生产,后启动消费者的话,消费者无法接收数据)
queue与topic的区别
阅读剩余
版权声明:
作者:Tin
链接:http://www.tinstu.com/1857.html
文章版权归作者所有,未经允许请勿转载。
THE END