RabbitMQ消息事务与确认机制
事务机制
基于 AMQP 实现了事务机制,类似于MySQL的事务
RabbitMQ提供了三个方法对消息发送进行事务管理:
txSelect()
:用于将通道Channel
开启事务模式,服务端会返回Tx.Select-OK
txCommit()
:用于提交事务txRollback()
:用于回滚事务
使用格式如下:
1 |
|
原理
开启事务后,消息起初并不会到指定的队列中
而是首先发送到一个临时队列中
只有当调用了txCommit()
后,刚刚存储到临时队列中的消息才会到指定的队列中去
缺点
开启-提交-回滚,三次操作,每次都相当于一次请求,降低了消息的吞吐量
因为走的通信太多,大量消息就会大量请求服务器,这样会非常耗时
注意
在消费者中,要将 autoACK
设置为 false
,手动提交ack
而为true
是不支持事物的,也就是说即使在收到消息之后回滚事务也是无济于事的,因为队列已经把消息移除了
确认机制
基于confirm
模式实现确认机制
考虑到AMQP
的事务机制性能消耗大
RabbitMQ提供了另一种低消耗的事务管理方式,使用 confirmSelect()
方法
原理
confirm
模式下的 channel
发送的消息会生成一个唯一的有序 ID (从1开始)
一旦消息成功发送到相应的队列之后,RabbitMQ服务端 会发送给生产者一个确认标志,包含消息的 ID
这样生产者就知道该消息已经发送成功了
如果消息和队列是持久化的,那么只有当 RabbitMQ服务器将消息成功写入到 磁盘之后,服务端才会发送确认标志
此外,服务端也可以设置basic.ack
和 mutiple
域,表明是否是批量确认的消息,即该序号之前的消息都已经收到了
confirm
的机制是异步的,生产者可以在等待的同时继续发送下一条消息,并且异步等待回调处理
- 消息发送成功,会返回
ack
消息供异步处理 - 消息发送失败,会返回
nack
消息
confirm
的事件没有明确说明,并且同一个消息只会被 confirm
一次
处理 ack
和 nack
的方式有三种
串行 confirm
1 |
|
其中waitForConfirms
可以换成带有时间参数的方法waitForConfirms(Long mills)
指定等待响应时间
批量 confirm
每发送一批次消息就调用waitForConfirms()
方法等待服务端confirm
1 |
|
批量的方法从数量级上降低了confirm的性能消耗,提高了效率。
但是有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送。
所以批量的confirm虽然性能提高了,但是消息的重复率也提高了。
异步 confirm
Channel
对象提供的 ConfirmListener()
回调方法只包含deliveryTag
(当前Channel
发出的消息序列号)
我们需要自己为每一个Channel
维护一个unconfirm
的序列号的集合
- 每
push
一条数据,集合元素加1 - 每回调一次
handleAck
方法,unconfirm
集合就删掉相应的一条(multiple=false
) 或者多条(multiple=true
)记录 - 从程序运行效率上来看,这个
unconfirm
集合最好采用有序集合SortedSet
存储结构。
使用监听方法,当服务端confirm
了一条或多条消息后,调用回调方法
1 |
|
每一个comfirm
的通道维护一个集合,每发送一条数据,集合增加一个元素,每异步响应一条ack
或者nack
的数据,集合删除一条。
SortedSet
是一个有序的集合,它的有序是值大小的有序,不是插入时间的有序。
JDK中waitForConfirms()
方法也是使用了SortedSet
集合