RocketMq\Kafka如何保障消息不丢失?

news/2025/2/22 21:37:56

程序那点事

保证RocketMq和Kafka消息不丢失需考虑Producer发送、Broker存储、Consumer消费。需配置同步发送/刷盘、重试机制、幂等性生产,手动提交偏移量等策略。摘要由作者通过智能技术生成

RocketMq架构图

RocketMq消息不丢失

要想保证消息不丢失,需要从以下几个方面考虑:

  • Producer 发送消息

  • Broker 主从切换、保存消息

  • Consumer 消费消息

发送端考虑

同步发送

同步发送会返回 4 个状态码:

  • SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功 

  • FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。

  • FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。

  • SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。

根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。(消息重试时,消费端需要做去重处理)

异步发送

异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。

Broker端考虑

刷盘策略

  • 异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。

  • 同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:flushDiskType=SYNC_FLUSH

Broker多副本和高可用

Broker 为了保证高可用,采用一主多从的方式部署。或多主多从。

消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:brokerRole=SYNC_MASTER

改为同步复制后,消息复制流程如下:

  1. slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;

  2. master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;

  3. slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;

  4. master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

消费端考虑

消费成功

如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。

消费失败重试

Consumer 消费失败,这里有 3 种情况:

  • 返回 RECONSUME_LATER

  • 返回 null

  • 抛出异常

Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。
注意:

  • Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。

  • 重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。

  • Consumer 端一定要做好幂等处理。
    其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地并告警进行人工处理,给 Broker 返回CONSUME_SUCCESS 来结束重试。

极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

Kafka

新版本的kafka, 已经不再依赖zookeeper.

对于kafka要保障消息的不丢失,同样需要考虑Producer、Broker、Consumer等情况。

生产者端

1. 配置正确的确认机制(acks 参数)

  • acks = 0:生产者发送消息后,不会等待 Broker 的确认,直接认为消息发送成功。这种方式性能最高,但可靠性最差,因为如果消息在发送过程中丢失,生产者无法得知。

  • acks = 1:生产者发送消息后,只要 Leader 分区成功写入消息,就会收到 Broker 的确认。这种方式在一定程度上保证了消息的可靠性,但如果 Leader 分区在确认消息后、将消息同步到 Follower 分区之前发生故障,消息仍然可能丢失。

  • acks = all 或 acks = -1:生产者发送消息后,需要等待 Leader 分区和所有 ISR(In - Sync Replicas,与 Leader 分区保持同步的副本集合)中的 Follower 分区都成功写入消息,才会收到 Broker 的确认。这种方式提供了最高的可靠性,但会降低性能,因为需要等待多个副本的写入操作完成。

2. 重试机制

生产者可以配置重试次数,当消息发送失败时,Kafka 会自动进行重试。例如,网络抖动、Broker 暂时不可用等情况导致的发送失败,通过重试机制可以提高消息发送的成功率。可以通过设置 retries 参数来指定重试次数,同时可以设置 retry.backoff.ms 参数来控制重试间隔。

3. 幂等性生产

Kafka 从 0.11.0.0 版本开始引入了幂等性生产者的概念。开启幂等性生产后,生产者会为每条消息生成一个唯一的 ID,Broker 会对消息进行去重处理,确保相同 ID 的消息只会被写入一次。可以通过设置 enable.idempotence 参数为 true 来开启幂等性生产。

Broker 端

1. 多副本机制

Kafka 采用多副本机制来提高消息的可靠性。每个分区都可以有多个副本,其中一个副本作为 Leader 分区,负责处理读写请求,其他副本作为 Follower 分区,从 Leader 分区同步消息。当 Leader 分区发生故障时,Kafka 会自动从 ISR 集合中选择一个 Follower 分区作为新的 Leader 分区,继续提供服务。可以通过设置 replication.factor 参数来指定分区的副本数,一般建议设置为 3 或以上。

2. 刷盘策略

Kafka 的消息是先写入内存中的缓冲区,然后定期刷盘。可以通过配置 flush.messages 和 flush.ms 参数来控制刷盘的频率。例如,设置 flush.messages = 1 表示每写入一条消息就刷盘一次,这样可以保证消息的持久化,但会降低性能;设置 flush.ms = 1000 表示每 1 秒刷盘一次,在性能和可靠性之间进行了平衡。

3. 合理配置 ISR

ISR 集合中的副本与 Leader 分区保持同步,如果某个 Follower 分区落后 Leader 分区太多,会被踢出 ISR 集合。可以通过设置 min.insync.replicas 参数来指定 ISR 集合中最少需要保持同步的副本数。当生产者设置 acks = all 时,如果 ISR 集合中的副本数小于 min.insync.replicas,生产者会收到错误信息,从而避免消息丢失。

消费者端

1. 手动提交偏移量

Kafka 消费者在消费消息时,会记录消费的偏移量(offset),表示已经消费到的消息位置。默认情况下,消费者会自动提交偏移量,但这种方式可能会导致消息丢失。建议使用手动提交偏移量的方式,在消息处理完成后再提交偏移量,确保消息不会因为消费者在处理过程中崩溃而丢失。可以通过设置 enable.auto.commit 参数为 false 来关闭自动提交偏移量,然后使用 commitSync() 或 commitAsync() 方法手动提交偏移量。

2. 处理消费异常

消费者在消费消息时,可能会遇到各种异常情况,如网络异常、业务逻辑异常等。需要在代码中捕获这些异常,并进行相应的处理,确保消息不会因为异常而丢失。例如,可以将异常消息记录下来,进行重试或人工处理。

作者:许Web

链接:https://juejin.cn/post/7469051793236295680


http://www.niftyadmin.cn/n/5862783.html

相关文章

《重构-》

一、代码坏的味道 神秘命名 ​​​​​代码应该直观明了。要深思熟虑如何给函数、模块、变量和类命名,使它们能清晰地表明 自己的功能和用法。 重复代码 一旦有重复代码存在,阅读这些重复的代码时你就必须加倍仔细,留意其间细微的差异。如果…

git使用-克隆远程项目、分支管理

文章目录 克隆远程项目到本地1. 远程找到需要克隆的项目,复制ssh地址2. idea开启git版本控制(如果已经开了,忽略此步骤)3. clone远端项目4. 克隆完成 分支管理1. 新建分支2. 切换分支3. 合并分支4. 储存变化 克隆远程项目到本地 …

软件架构设计:软件工程

一、软件工程概述 软件工程的定义 软件工程是应用系统化、规范化、可量化的方法开发、运行和维护软件。 软件工程的目标 提高软件质量、降低开发成本、缩短开发周期。 软件生命周期 瀑布模型:需求分析→设计→编码→测试→维护。迭代模型:分阶段迭代开…

【Leetcode 每日一题】2209. 用地毯覆盖后的最少白色砖块

问题背景 给你一个下标从 0 0 0 开始的 二进制 字符串 f l o o r floor floor,它表示地板上砖块的颜色。 f l o o r [ i ] floor[i] floor[i] 为 ‘0’ 表示地板上第 i i i 块砖块的颜色是 黑色 。 f l o o r [ i ] floor[i] floor[i] 为’1’ 表示地板上第 i …

【够用就好005】-在VSCode中管理ECS服务器的实操步骤

前景提要:接触过云服务器,当前有一个可以使用的ecs服务器。 关于如何搭建配置云服务不在今天分享主题内。 亲测有效!!! 通过 VSCode 直接配置服务器步骤 一.先安装ssh插件 CTRL shift x 插件界面输入ssh安装remot…

Golang连接使用SqlCipher

一、准备环境 需要下载MinGW、msys2、OpenSSL,并且注意都需要64位 已经整理成环境软件包,只需要下载,并配置环境变量 链接: https://pan.baidu.com/s/1NxF8aWqx7s97ntACOk77Ug 提取码: yhrv 二、代码 package mainimport ("database/s…

CTA策略【量化理论】

CTA策略演变史 全称:Commodity Trading Advisor (商品交易顾问) CTA最开始是指通过为客户提供期权、期货方面的交易建议,或者直接通过受管理的期货账户参与实际交易,来获得收益的机构或个人。 随着市场的发展&#…

2024华为OD机试真题-单词接龙(C++)-E卷B卷-100分

2024华为OD机试最新题库-(C卷+D卷+E卷)-(JAVA、Python、C++) 目录 题目描述: 输入描述: 输出描述: 示例1 示例2 题目解析 考点 代码 c++ 题目描述: 单词接龙的规则是:可用于接龙的单词首字母必须要前一个单词的尾字母相同; 当存在多个首字母相同的单词时,取…