当前观点:一碰就头疼的 Kafka 消息重复问题,立马解决!
文章来源:https://juejin.cn/post/7172897190627508237
一、前言
(资料图片仅供参考)
数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。
通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。
整理下消息重复的几个场景:
生产端:遇到异常,基本解决措施都是 重试。
场景一:leader
分区不可用了,抛 LeaderNotAvailableException
异常,等待选出新 leader
分区。 场景二: Controller
所在 Broker
挂了,抛 NotControllerException
异常,等待 Controller
重新选举。 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException
异常,等待网络恢复。 消费端:poll
一批数据,处理完毕还没提交 offset
,机子宕机重启了,又会 poll
上批数据,再度消费就造成了消息重复。
怎么解决?
先来了解下消息的三种投递语义:
最多一次(at most once
):消息只发一次,消息可能会丢失,但绝不会被重复发送。例如: mqtt
中 QoS = 0
。 至少一次(at least once
):消息至少发一次,消息不会丢失,但有可能被重复发送。例如: mqtt
中 QoS = 1
精确一次(exactly once
):消息精确发一次,消息不会丢失,也不会被重复发送。例如: mqtt
中 QoS = 2
。 了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:
Kafka
幂等性 Producer
:保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话) Kafka
事务:保证生产端发送消息幂等。解决幂等 Producer
的局限性。 消费端幂等:保证消费端接收消息幂等。蔸底方案。 1)Kafka
幂等性 Producer
幂等性指:无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。
幂等性使用示例:在生产端添加对应配置即可
Properties props = newProperties();props.put(\"enable.idempotence\", ture); // 1. 设置幂等props.put(\"acks\", \"all\"); // 2. 当 enable.idempotence 为 true,这里默认为 allprops.put(\"max.in.flight.requests.per.connection\", 5); // 3. 注意
设置幂等,启动幂等。 配置 acks
,注意:一定要设置 acks=all
,否则会抛异常。 配置 max.in.flight.requests.per.connection
需要 <= 5
,否则会抛异常 OutOfOrderSequenceException
。
0.11 >= Kafka < 1.1
, max.in.flight.request.per.connection = 1
Kafka >= 1.1
, max.in.flight.request.per.connection <= 5
为了更好理解,需要了解下 Kafka 幂等机制:
Producer
每次启动后,会向 Broker
申请一个全局唯一的 pid
。(重启后 pid
会变化,这也是弊端之一) Sequence Numbe
:针对每个
都对应一个从0开始单调递增的 Sequence
,同时 Broker
端会缓存这个 seq num
判断是否重复:拿
去 Broker
里对应的队列 ProducerStateEntry.Queue
(默认队列长度为 5)查询是否存在
如果 nextSeq == lastSeq + 1
,即 服务端seq + 1 == 生产传入seq
,则接收。
如果 nextSeq == 0 && lastSeq == Int.MaxValue
,即刚初始化,也接收。
反之,要么重复,要么丢消息,均拒绝。
这种设计针对解决了两个问题:
消息重复:场景Broker
保存消息后还没发送 ack
就宕机了,这时候 Producer
就会重试,这就造成消息重复。 消息乱序:避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。 那什么时候该使用幂等:
如果已经使用acks=all
,使用幂等也可以。 如果已经使用 acks=0
或者 acks=1
,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。
2)Kafka
事务
使用
Kafka
事务解决幂等的弊端:单会话且单分区幂等。
Tips
:这块篇幅较长,这先稍微提及下使用,之后另起一篇。
事务使用示例:分为生产端 和 消费端
Properties props = newProperties();props.put(\"enable.idempotence\", ture); // 1. 设置幂等props.put(\"acks\", \"all\"); // 2. 当 enable.idempotence 为 true,这里默认为 allprops.put(\"max.in.flight.requests.per.connection\", 5); // 3. 最大等待数props.put(\"transactional.id\", \"my-transactional-id\"); // 4. 设定事务 idProducer producer = newKafkaProducer(props);// 初始化事务producer.initTransactions();try{// 开始事务producer.beginTransaction();// 发送数据producer.send(newProducerRecord(\"Topic\", \"Key\", \"Value\"));// 数据发送及 Offset 发送均成功的情况下,提交事务producer.commitTransaction();} catch(ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 数据发送或者 Offset 发送出现异常时,终止事务producer.abortTransaction();} finally{// 关闭 Producer 和 Consumerproducer.close();consumer.close();}
这里消费端 Consumer
需要设置下配置:isolation.level
参数
read_uncommitted
:这是默认值,表明 Consumer
能够读取到 Kafka
写入的任何消息,不论事务型 Producer
提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer
,那么对应的 Consumer
就不要使用这个值。
read_committed
:表明 Consumer
只会读取事务型 Producer
成功提交事务写入的消息。当然了,它也能看到非事务型 Producer
写入的所有消息。 3)消费端幂等
“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。
只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。
典型的方案是使用:消息表,来去重:
上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id
新增到本地消息表中,同时更新订单信息。
如果消息重复,则新增操作 insert
会异常,同时触发事务回滚。
二、案例:
Kafka 幂等性 Producer 使用
环境搭建可参考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic
准备工作如下:
1、Zookeeper
:本地使用 Docker
启动 $docker run -d --name zookeeper -p 2181:2181 zookeepera86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
2、Kafka
:版本 2.7.1
,源码编译启动(看上文源码搭建启动) 3、启动生产者: Kafka
源码中 exmaple
中 4、启动消息者:可以用 Kafka
提供的脚本
#举个栗子:topic 需要自己去修改$cd./kafka-2.7.1-src/bin$./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
创建 topic
:1副本,2 分区
$./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2#查看$./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
生产者代码:
publicclassKafkaProducerApplication {privatefinal Producer producer;final StringoutTopic;publicKafkaProducerApplication(final Producer producer,final Stringtopic) {this.producer = producer;outTopic = topic;}publicvoidproduce(final Stringmessage) {final String[] parts = message.split(\"-\");final Stringkey, value;if(parts.length > 1) {key = parts[0];value = parts[1];} else{key = null;value = parts[0];}final ProducerRecord producerRecord = newProducerRecord<>(outTopic, key, value);producer.send(producerRecord,(recordMetadata, e) -> {if(e != null) {e.printStackTrace();} else{System.out.println(\"key/value \"+ key + \"/\"+ value + \"\twritten to topic[partition] \"+ recordMetadata.topic() + \"[\"+ recordMetadata.partition() + \"] at offset \"+ recordMetadata.offset());}});}publicvoidshutdown() {producer.close();}publicstaticvoidmain(String[] args) {final Properties props = newProperties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, \"true\");props.put(ProducerConfig.ACKS_CONFIG, \"all\");props.put(ProducerConfig.CLIENT_ID_CONFIG, \"myApp\");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);final Stringtopic = \"myTopic\";final Producer producer = newKafkaProducer<>(props);final KafkaProducerApplication producerApp = newKafkaProducerApplication(producer, topic);StringfilePath = \"/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt\";try{List linesToProduce = Files.readAllLines(Paths.get(filePath));linesToProduce.stream().filter(l -> !l.trim().isEmpty()).forEach(producerApp::produce);System.out.println(\"Offsets and timestamps committed in batch from \"+ filePath);} catch(IOException e) {System.err.printf(\"Error reading file %s due to %s %n\", filePath, e);} finally{producerApp.shutdown();}}}
启动生产者后,控制台输出如下:
启动消费者:
$ ./kafka-console-consumer.sh--bootstrap-server localhost:9092--topic myTopic
修改配置 acks
启用幂等的情况下,调整 acks
配置,生产者启动后结果是怎样的:
修改配置 acks = 1
acks = 0
会直接报错:
Exception in thread \"main\" org.apache.kafka.common.config.ConfigException: Must setacks toall inordertousethe idempotent producer. Otherwise we cannot guaranteeidempotence.
修改配置 max.in.flight.requests.per.connection
启用幂等的情况下,调整此配置,结果是怎样的:
将 max.in.flight.requests.per.connection > 5
会怎样?
当然会报错:
Causedby: org.apache.kafka.common.config.ConfigException: Mustsetmax.in.flight.requests.per.connectiontoatmost5 tousetheidempotentproducer.
责任编辑:宋璟
-
银行行长表示:若存款金额达到“这个数”,可以和银行谈利率
-
《父辈的荣耀》:林晓晴喜怀双胞胎急需输血 顾长山竟是她亲爹
-
家中千万现金四处藏,新竹县工务处长、厂商等4人全收押
-
切实守护人民群众生命财产安全
-
中国电动车企积极布局东盟市场
-
四大行下调房贷利率 购房者能省多少钱
-
若可以攻玉(关于若可以攻玉简述)
-
半场战报:斯洛伐克0-1葡萄牙,B费生日夜破门,施兰茨中柱
-
教师节前夕,南航为 109 名教师举行荣休仪式
-
婴儿游泳设备(婴儿游泳设备)
-
深圳罗湖一小区旁菜地伊蚊较多,疾控专家:有传播登革热风险
-
(台青话融合)“80后”台青大陆逐梦十载 深耕金融市场发掘创业商机
-
第四范式通过港交所聆讯,亏损比率收窄,或成人工智能上市风向标
-
研究团队开发出完全依靠氢气运行的2升级氢发动机
-
51岁男子与妻子分床22年,说:她6个月就生了,我不舒服
-
信阳接种第二剂次疫苗应该注意哪些事项?
-
苹果两天跌掉1900亿美元?Wedbush:别慌,还能再涨20%!
-
凤凰传媒: 公司及公司大股东目前暂无回购或增持安排,如有相关计划公司将会及时履行信息披露义务
-
反犬字旁的字有哪些大全(反犬字旁的字有哪些)
-
三星g9198上市时间和价格(三星g9198)
-
被指控试图推翻大选结果,特朗普:不认罪
-
点绿成金 上海农商银行打造碳金融服务闭环
-
曝光!吉安4家单位存在重大火灾隐患
-
安能物流上半年 毛利5.52亿元增长超一倍
-
53.7GW!1-8月13家整机商中标统计!金风、远景、运达位居前三!
-
广西玉林天天网(关于广西玉林天天网的基本详情介绍)
-
八月的最后一场浪漫晚霞,你赶上了吗?
-
张让三国杀图片(张让三国杀)
-
【国君非银刘欣琦团队|中金公司23年中报点评】汇兑收益稳定业绩,机构业务加速发展
-
看到女友发视频夸婆婆一查她已婚,男子起诉要回转账欠款
-
米体:米兰仍未与塔雷米谈妥奖金和付款方式,并开始感到不耐烦
-
成都到九寨沟汽车站时刻表(成都到九寨沟汽车)
-
中国铁建:聘任王立新为公司总裁、朱宏标为公司总会计师
-
张强同学家的高压锅盖上标有“××铝制品厂18cm压力锅”的字样他测得高压锅限压阀的质量为70g排气孔
-
工商银行龙凤呈祥金条50克价格今天多少一克(2023年08月31日)