首页 独家 > 正文

当前观点:一碰就头疼的 Kafka 消息重复问题,立马解决!

2023-01-23 18:19:42 架构师社区

文章来源:https://juejin.cn/post/7172897190627508237

一、前言


(资料图片仅供参考)

数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。

通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

整理下消息重复的几个场景:

生产端:遇到异常,基本解决措施都是 重试

场景一: leader分区不可用了,抛 LeaderNotAvailableException异常,等待选出新 leader分区。 场景二: Controller所在 Broker挂了,抛 NotControllerException异常,等待 Controller重新选举。 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException异常,等待网络恢复。

消费端:poll一批数据,处理完毕还没提交 offset,机子宕机重启了,又会 poll上批数据,再度消费就造成了消息重复。

怎么解决?

先来了解下消息的三种投递语义:

最多一次(at most once):消息只发一次,消息可能会丢失,但绝不会被重复发送。例如: mqttQoS = 0至少一次(at least once):消息至少发一次,消息不会丢失,但有可能被重复发送。例如: mqttQoS = 1精确一次(exactly once):消息精确发一次,消息不会丢失,也不会被重复发送。例如: mqttQoS = 2

了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:

Kafka幂等性 Producer保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话) Kafka事务:保证生产端发送消息幂等。解决幂等 Producer的局限性。 消费端幂等:保证消费端接收消息幂等。蔸底方案。

1)Kafka幂等性 Producer

幂等性指:无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。

幂等性使用示例:在生产端添加对应配置即可

Properties props = newProperties();props.put(\"enable.idempotence\", ture); // 1. 设置幂等props.put(\"acks\", \"all\"); // 2. 当 enable.idempotence 为 true,这里默认为 allprops.put(\"max.in.flight.requests.per.connection\", 5); // 3. 注意
设置幂等,启动幂等。 配置 acks,注意:一定要设置 acks=all,否则会抛异常。

配置 max.in.flight.requests.per.connection需要 <= 5,否则会抛异常 OutOfOrderSequenceException

0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1

Kafka >= 1.1, max.in.flight.request.per.connection <= 5

为了更好理解,需要了解下 Kafka 幂等机制:

Producer每次启动后,会向 Broker申请一个全局唯一的 pid。(重启后 pid会变化,这也是弊端之一) Sequence Numbe:针对每个 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num

判断是否重复:Broker里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在

如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。

如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。

反之,要么重复,要么丢消息,均拒绝。

这种设计针对解决了两个问题:

消息重复:场景 Broker保存消息后还没发送 ack就宕机了,这时候 Producer就会重试,这就造成消息重复。 消息乱序:避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。

那什么时候该使用幂等:

如果已经使用 acks=all,使用幂等也可以。

如果已经使用 acks=0或者 acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。

2)Kafka事务

使用 Kafka事务解决幂等的弊端:单会话且单分区幂等。

Tips这块篇幅较长,这先稍微提及下使用,之后另起一篇。

事务使用示例:分为生产端 和 消费端

Properties props = newProperties();props.put(\"enable.idempotence\", ture); // 1. 设置幂等props.put(\"acks\", \"all\"); // 2. 当 enable.idempotence 为 true,这里默认为 allprops.put(\"max.in.flight.requests.per.connection\", 5); // 3. 最大等待数props.put(\"transactional.id\", \"my-transactional-id\"); // 4. 设定事务 idProducer producer = newKafkaProducer(props);// 初始化事务producer.initTransactions();try{// 开始事务producer.beginTransaction();// 发送数据producer.send(newProducerRecord(\"Topic\", \"Key\", \"Value\"));// 数据发送及 Offset 发送均成功的情况下,提交事务producer.commitTransaction();} catch(ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// 数据发送或者 Offset 发送出现异常时,终止事务producer.abortTransaction();} finally{// 关闭 Producer 和 Consumerproducer.close();consumer.close();}

这里消费端 Consumer需要设置下配置:isolation.level参数

read_uncommitted这是默认值,表明 Consumer能够读取到 Kafka写入的任何消息,不论事务型 Producer提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer,那么对应的 Consumer就不要使用这个值。

read_committed表明 Consumer只会读取事务型 Producer成功提交事务写入的消息。当然了,它也能看到非事务型 Producer写入的所有消息。

3)消费端幂等

“如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。

只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。

典型的方案是使用:消息表,来去重:

上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id新增到本地消息表中,同时更新订单信息。

如果消息重复,则新增操作 insert会异常,同时触发事务回滚。

二、案例:

Kafka 幂等性 Producer 使用

环境搭建可参考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

准备工作如下:

1、Zookeeper:本地使用 Docker启动
$docker run -d --name zookeeper -p 2181:2181 zookeepera86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
2、Kafka:版本 2.7.1,源码编译启动(看上文源码搭建启动) 3、启动生产者: Kafka源码中 exmaple

4、启动消息者:可以用 Kafka提供的脚本

#举个栗子:topic 需要自己去修改$cd./kafka-2.7.1-src/bin$./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

创建 topic1副本,2 分区

$./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2#查看$./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe

生产者代码:

publicclassKafkaProducerApplication {privatefinal Producer producer;final StringoutTopic;publicKafkaProducerApplication(final Producer producer,final Stringtopic) {this.producer = producer;outTopic = topic;}publicvoidproduce(final Stringmessage) {final String[] parts = message.split(\"-\");final Stringkey, value;if(parts.length > 1) {key = parts[0];value = parts[1];} else{key = null;value = parts[0];}final ProducerRecord producerRecord = newProducerRecord<>(outTopic, key, value);producer.send(producerRecord,(recordMetadata, e) -> {if(e != null) {e.printStackTrace();} else{System.out.println(\"key/value \"+ key + \"/\"+ value + \"\twritten to topic[partition] \"+ recordMetadata.topic() + \"[\"+ recordMetadata.partition() + \"] at offset \"+ recordMetadata.offset());}});}publicvoidshutdown() {producer.close();}publicstaticvoidmain(String[] args) {final Properties props = newProperties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, \"true\");props.put(ProducerConfig.ACKS_CONFIG, \"all\");props.put(ProducerConfig.CLIENT_ID_CONFIG, \"myApp\");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);final Stringtopic = \"myTopic\";final Producer producer = newKafkaProducer<>(props);final KafkaProducerApplication producerApp = newKafkaProducerApplication(producer, topic);StringfilePath = \"/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt\";try{List linesToProduce = Files.readAllLines(Paths.get(filePath));linesToProduce.stream().filter(l -> !l.trim().isEmpty()).forEach(producerApp::produce);System.out.println(\"Offsets and timestamps committed in batch from \"+ filePath);} catch(IOException e) {System.err.printf(\"Error reading file %s due to %s %n\", filePath, e);} finally{producerApp.shutdown();}}}

启动生产者后,控制台输出如下:

启动消费者:

$ ./kafka-console-consumer.sh--bootstrap-server localhost:9092--topic myTopic

修改配置 acks

启用幂等的情况下,调整 acks配置,生产者启动后结果是怎样的:

修改配置 acks = 1

修改配置 acks = 0

会直接报错:

Exception in thread \"main\" org.apache.kafka.common.config.ConfigException: Must setacks toall inordertousethe idempotent producer. Otherwise we cannot guaranteeidempotence.

修改配置 max.in.flight.requests.per.connection

启用幂等的情况下,调整此配置,结果是怎样的:

max.in.flight.requests.per.connection > 5会怎样?

当然会报错:

Causedby: org.apache.kafka.common.config.ConfigException: Mustsetmax.in.flight.requests.per.connectiontoatmost5 tousetheidempotentproducer.

关键词: 提交事务 的情况下 这个问题

责任编辑:宋璟

返回首页
相关新闻
返回顶部