腾讯公开课

生产者本地消息表

在发送方,我采用的是本地消息表解决方案。简单来说,就是在业务操作的过程中,在本地消息表里面记录一条待发消息,做成一个本地数据库事务。然后尝试立刻发送消息,如果发送成功,那么就把本地消息表里对应的数据删除,或者把状态标记成已发送。 如果这个时候失败了,就可以立刻尝试重试。同时,还要有一个异步的补发机制,扫描本地消息表,找出已经过了一段时间,比如说三分钟,但是还没有发送成功的待发消息,然后补发。 最后提到的异步补发机制,你可以简单理解成有一个线程定时扫描数据库,找到需要发送但是又没有发送的消息发送出去。更直观的来说,就是这个线程会执行一个类似这样的 SQL

额外增加了一个新的列,用来控制重试的间隔和重试的次数。如果最终补发都失败了就会告警。这个时候就需要人手工介入了。 那么这时候本地消息表至少有两个关键列:一个是消息体列,里面存储了消息的数据;另一个是重试机制列,里面可以只存储重试次数,也可以存储重试间隔、已重试次数、最大重试次数。剩余的列,你就根据自己的需要随便加,不关键

案其实就是把一个分布式事务转变成本地事务 + 补偿机制。

分布式事务一般不用,通过更多优雅的手段来处理这些问题,这些都是常见问题,分布式环境丢数据都很正常

通过一些手段来多次校验保证即可

image.png

消息队列自身不丢

那么 acks 就需要设置成 all。而且,也不能允许 Kafka 使用unclean 选举。

进一步考虑刷盘的问题,那么就需要调整 log.flush.interval.messages、log.flush.interval.mslog.flush.scheduler.interval.ms 的值。

在关键业务上,我一般都是把 acks 设置成 all 并且禁用 unclean 选举,来确保消息发送到了消息队列上,而且不会丢失。同时 log.flush.interval.messages、log.flush.interval.mslog.flush.scheduler.interval.ms 三个参数会影响刷盘,这三个值我们公司设置的是10000、2000、3000。理论上来说,这种配置确实还是有一点消息丢失的可能,但是概率已经非常低了。只有一种可能,就是消息队列完成主从同步之后,主分区和 ISR 的从分区都没来得及刷盘就崩溃了,才会丢失消息。这个时候真丢失了消息,就只能人手工补发

消费者

消费者一般要保证幂等操作

异步处理要注意

一定要把消费逻辑设计成幂等的。你的微服务也要尽可能设计成幂等的,这样上游就可以利用重试来提高可用性了。另外我要说明一点,现在大多数消息中间件都声称自己实现了恰好一次(exactly once)语义,都是依赖于重试和幂等来达成的。

消息回查

事务消息 | RocketMQ