ActiveMQ:消息存储与持久化
介绍
此处持久化和之前的持久化的区别
MQ高可用:事务、可持久、签收,是属于MQ自身特性,自带的。这里的持久化是外力,是外部插件。之前讲的持久化是MQ的外在表现,现在讲的的持久是是底层实现。
是什么
持久化是什么?一句话就是:ActiveMQ宕机了,消息不会丢失的机制。
说明:为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。再试图将消息发给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
有哪些?
- AMQ Message Store
基于文件的存储机制,是以前的默认机制,现在不再使用。
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本
- kahaDB
现在默认的。下面我们再详细介绍。
- JDBC消息存储
下面详细介绍。
- LevelDB消息存储
过于新兴的技术,现在有些不确定。2022 已经弃用
- JDBC Message Store with ActiveMQ Journal
下面详细介绍。
kahaDB
介绍
基于日志文件,从ActiveMQ5.4(含)开始默认的持久化插件。
官网文档:http://activemq.aache.org/kahadb
日志文件的存储目录在:activemq安装目录/data/kahadb
配置文件activemq.xml
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
说明
存储原理
JDBC消息存储
原理
步骤:
1.添加mysql数据库的驱动包到lib文件夹
https://mvnrepository.com/artifact/org.wisdom-framework/mysql-connector-java/5.1.34_1
2.修改配置文件activemq.xml。将之前的kahaDB替换为jdbc的配置。如下
<!--<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>
</persistenceAdapter>
createTableOnStartup
是否在启动时创建数据表,不写默认为true,一般在第一次启动设置为true,之后改为false
3.数据库连接池配置
将一下内容修改好后,粘贴到activemq.xml的 </broker> 与 <import resource="jetty.xml"/> 之间
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://mysql数据库URL/activemq?relaxAutoCommit=true"/>
<property name="username" value="mysql数据库用户名"/>
<property name="password" value="mysql数据库密码"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
<import resource="jetty.xml"/>
4.建仓SQL和键表说明
(1)建一个名为activemq的数据库
(2)上面设置createTablesOnStartup="true"
如果配置没有问题的话会自动生成 表和 字段
验证
queue验证和数据表变化
queue模式,非持久化不会将消息持久化到数据库。
queue模式,持久化会将消息持久化数据库。
点对点模式中,消息一旦被消费,就从broker 的相应文件或者数据库中删除!
(1)生产者开启持久化,然后启动,向mq发送消息成功后,查看数据库:
发现ACTIVEMQ_MSGS数据表多了3条数据
(2)在启动消费者消费消息,消息从数据库中被自动删除
(queue模式非持久化,不会持久化消息到数据表。)
topic验证和说明
topic代码:
(1)先启动一下持久化topic的消费者。看到ACTIVEMQ_ACKS数据表多了一条消息。
(2)启动持久化生产者发布3个数据,ACTIVEMQ_MSGS数据表新增3条数据,消费者消费所有的数据后,ACTIVEMQ_MSGS数据表的数据并没有消失。持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。这个是要注意的,持久化的topic大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。
总结
1.1 JDBC Message Store with ActiveMQ Journal
说明
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。
配置
下面是基于上面JDBC配置,再做一点修改:还是修改 activemq.xml
加入如下代码:
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data"/>
</persistenceFactory>
生产者生产消息,不会里面被持久化到数据库,但可以被消费,长时间没有被消费的消息,再进行持久化到数据库.
总结
jdbc效率低,kahaDB效率高,jdbc+Journal效率较高。
持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。