使用消息队列怎样防止消息重复?

news/2025/2/27 11:37:51

大家好,我是君哥。

使用消息队列时,我们经常会遇到一个可能对业务产生影响的问题,消息重复。在订单、扣款、对账等对幂等有要求的场景,消息重复的问题必须解决。

那怎样应对重复消息呢?今天来聊一聊这个话题。

1.三个语义

正确使用消息队列,我们会考虑到消息防丢失、防重复,我们介绍 3 个语义:

  • At Least Once:在消息队列中,指消息不丢失,一条消息最少被消费一次,但是可能会有重复消费。

  • Exactly Once:在消息队列中,消息被精准消费一次,不丢失,也不会重复;

  • At Most Once:在消息队列中,消息不会被重复消费,但是可能会有消息丢失

不同的消息场景,需要的语义不同。比如 Exactly Once 最难实现,一般需要引入事务消息。

不同使用场景,对语义的要求也不一样。比如日志收集类的场景,At Most Once 就可以满足,而支付类的场景则要求 Exactly Once。

2.消息重复

什么情况下会导致消息重复呢?

生产者发送消息后,Broker 保存成功,但是没有成功给生产者返回 ACK,生产者以为消息发送失败,重试,再次给 Broker 发送。Broker 保存了重复消息,导致 Consumer 多次消费。

图片

消费者消费消息后,给 Broker 返回 ACK 失败,导致 Broker 没有修改偏移量,同一条消息再次发送给消费者,或者被消费者拉取到。

图片

3.生产者防重

有的消息中间件是支持生产者幂等的。比如 Kafka 从 0.11.0 版本开始引入了幂等 Producer,可以使用下面代码开启幂等 Producer:

Properties props = new Properties();
//省略其他代码
//配置幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); 
//创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Kafka 实现生产者幂等的原理是在生产者引入了 Producer ID(PID)和 Sequence Number 这两个参数。

  • PID:Producer 拥有的 ID,唯一标识一个 Producer。

  • Sequence Number:自增的数值,唯一标识同一个 Producer 发送到指定分区的消息 ID。

有了这两个参数,Broker 单分区就可以唯一标识一个生产者发送的唯一一条消息<PID,SequenceNumber>。Broker 收到消息时,如果检查到消息的<PID,SequenceNumber>已经存在,就不会再保留这条消息。

但幂等 Producer 只能在单分区下生效,多分区情况下是不生效的。因为多个分区之间并不能相互访问对方的<PID,SequenceNumber>。

图片

4.Broker 防重

Broker 如果可以防重,那对于生产者和消费者来说,节省了大量的工作。下面我们看下 Pulsar 是怎样防重的。

Broker 通过参数 BrokerDeduplicationEnabled 开启防重功能。对于 Producer 发送的重复消息,Broker 返回响应 -1:-1。

Producer 发送消息时,会带一个 sequenceId 字段,Broker 会按照 ProducerName 维度记录当前生产者最大的 sequenceId(highestSequenceId)。Broker 收到消息时,首先会判断消息中的 sequenceId 是否大于自己保存的当前生产者的 highestSequenceId,如果是则保存消息并更新 highestSequenceId,否则丢弃消息,并且给 Producer 返回 -1:-1。

下面是三个极端情况:

  1. Producer 断开连接:这种情况下,跟 Broker 重新建立连接后,本地保存的 sequenceId 还在,只要使用 sequenceId 递增后发送消息即可;

  2. Producer 宕机:Producer 重启后,缓存的 sequenceId 肯定不存在了,这时跟 Broker 重新建立连接后,Broker 会根据 ProducerName 找出 highestSequenceId 发给 Producer,Producer 使用这个 sequenceId 来发送消息;

  3. Producer 和 Broker 都宕机:Broker 重启后,可以从宕机前保存的快照中恢复各 Producer 对应的 highestSequenceId 发送给各 Producer。但这个 highestSequenceId 不一定准确,因为 Broker 宕机瞬间很有可能最新的 sequenceId 没有来得及保存快照。

需要注意的是,跟 Kafka 的幂等 Producer 类似,Pulsar 的 Broker 幂等也只能保证 Topic/Partition 级别。

5.消费者防重

从上面的分析可以看出,靠生产者防重和 Broker 防重,只能在 Topic/Partition 级别生效,这通常并不能满足我们的需求。而为了避免消费者重复消费对业务造成影响,消息防重还是必要的。这就要求我们做最后一道防线,在消费端进行防重或幂等处理。

消费端做防重,就不再考虑消息中间件层面的配置(比如 sequenceId),而是从消息体进行下手。

生产者发送消息时,给消息体赋值一个全局唯一的 ID,消费者处理消息时,根据全局唯一 ID 做防重。

比如消费端的逻辑是保存一条订单消息,那把唯一 ID 保存到数据库并且加一个唯一索引,这样根据唯一索引就可以做消息去重。

不过使用唯一索引也有缺点:

  • 如果使用 MySQL 数据库,不能使用 Change Buffer;

  • 非插入的场景(比如更新库存)不能去重。

对于唯一索引的缺点,我们可以引入 Redis 对唯一 ID 做保存,利用 setNx 判断消息是否已经处理过。如下图:

图片

if (jedis.setnx(ID, "1") == 1) {
 //处理业务,返回 ACK
}else {
    //直接返回 返回 ACK
}

6.总结

使用消息队列,在一些场景下是需要防重的。主流消息队列提供了一些防重的能力,但并不是完全可靠的。在对重复消息敏感的场景下,最好是在消费端处理消息时,从业务层面进行消息防重。


http://www.niftyadmin.cn/n/5870053.html

相关文章

基于阿里云PAI平台快速部署DeepSeek大模型实战指南

一、DeepSeek大模型&#xff1a;企业级AI应用的新标杆 1.1 为什么选择DeepSeek&#xff1f; 近期&#xff0c;DeepSeek系列模型凭借其接近GPT-4的性能和开源策略&#xff0c;成为全球开发者关注的焦点。在多项国际评测中&#xff0c;DeepSeek-R1模型在推理能力、多语言支持和…

地基JDK8新特性之Lambda 表达式和Stream 流操作

一、Lambda 表达式基础 1. 替代匿名内部类 // 传统写法 Runnable r1 new Runnable() {Overridepublic void run() {System.out.println("Hello World");} };// Lambda 写法 Runnable r2 () -> {System.out.println("hello");}; 2. 函数式接口排序…

7. 覆盖率:covergroup/coverpoint/cross

文章目录 前言一、核心概念剖析1. covergroup‌2.coverpoint‌3. cross‌4. 覆盖率三要素对比表 二、实现模式指南2.1 covergroup2.2 coverpoint2.3 cross2.3 拓展知识1. 智能bins生成‌2. 权重控制‌3. 条件覆盖‌4. 自动分仓5. 手动分仓6. 条件过滤 三、典型应用场景3.1 cove…

15.代码随想录算法训练营第十五天|(递归)110. 平衡二叉树,257. 二叉树的所有路径*,404. 左叶子之和,222.完全二叉树的节点个数[打卡自用]

15.代码随想录算法训练营第十五天|&#xff08;递归&#xff09;110. 平衡二叉树&#xff0c;257. 二叉树的所有路径*&#xff0c;404. 左叶子之和&#xff0c;222.完全二叉树的节点个数 给定一个二叉树&#xff0c;判断它是否是 平衡二叉树 示例 1&#xff1a; 输入&#xf…

在 macOS 系统上安装 kubectl

在 macOS 系统上安装 kubectl 官网&#xff1a;https://kubernetes.io/zh-cn/docs/tasks/tools/install-kubectl-macos/ 用 Homebrew 在 macOS 系统上安装 如果你是 macOS 系统&#xff0c;且用的是 Homebrew 包管理工具&#xff0c; 则可以用 Homebrew 安装 kubectl。 运行…

如何解决svn st中出现!(冲突)的问题

在 SVN&#xff08;Subversion&#xff09;中&#xff0c;svn status 命令用于查看工作副本的状态。当你看到 ! 符号时&#xff0c;通常表示文件或目录在工作副本中丢失&#xff08;missing&#xff09;。以下是解决这个问题的步骤&#xff1a; 1. 理解 ! 的含义 ! 表示该文件…

【2025全网最新最全】前端Vue3框架的搭建及工程目录详解

文章目录 安装软件Node.js搭建Vue工程创建Vue工程精简Vue项目文件 Vue工程目录的解读网页标题的设置设置全局样式路由配置 安装软件Node.js 下载地址&#xff1a;https://nodejs.org/zh-cn/ 安装完成后&#xff0c;打开cmd,查看环境是否准备好 node -v npm -vnpm使用之前一定…

基于Spring Boot的健康医院门诊在线挂号系统设与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…