RabbitMQ:Work Queues-轮训分发消息
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
轮训分发消息
启动两个工作线程,一个消息发送线程,看看两个工作线程是如何工作的。
抽取公共类
写一个链接工程创建信道的工具类 RabbitMqUtils.java
public class RabbitMqUtils {
//得到一个连接的 channel
public static Channel getChannel() throws Exception{
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("180.76.106.99");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
启动两个工作线程
1.先写一个消费者
public class Work01 {
private final static String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//消息的接收
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody()));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
System.out.println("线程1等待消息......");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
启动之后,idea设置允许多实例,再次启动这个消费者
此时有两个线程 的消费者
编写一个发送线程
public class Task01 {
public static final String QUEUE_NAME ="hello";
//发送大量消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//从控制台当中接收消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
}
}
程序测试
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息
阅读剩余
版权声明:
作者:Tin
链接:http://www.tinstu.com/1997.html
文章版权归作者所有,未经允许请勿转载。
THE END