2024-04-03  阅读(16)
原文作者:吴声子夜歌 原文地址: https://blog.csdn.net/cold___play/article/details/132287669

无消息丢失配置

Java版本producer用户采用异步发送机制。KafkaProducer.send方法仅仅把消息放入缓冲区中,由一个专属I/O线程负责从缓冲区中提取消息并封装进消息batch中,然后发送出去。显然,这个过程中存在着数据丢失的窗口:若I/O线程发送之前producer崩溃,则存储缓冲区中的消息全部丢失了。这是producer需要处理的很重要的问题。

producer的另一个问题就是消息的乱序。假设客户端依次执行下面的语句发送两条消息到相同的分区:

    producer.send(record1);
    producer.send(record2);

若此时由于某些原因(比如瞬时的网络抖动)导致record1未发送成功,同时Kafka又配置了重试机制以及max.in.flight.requests.per.connection大于1(默认值是5),那么producer重试recordl成功后,recordl在日志中的位置反而位于record2之后,这样造成了消息的乱序。要知道很多实际使用场景中都有事件强顺序保证的要求。

鉴于producer的这两个问题,应该如何规避呢?首先,对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送可能丢失数据,改成同步发送似乎是一个不错的主意。比如这样:

    producer.send(record).get();

采用同步发送当然是可以的,但是性能会很差,并不推荐在实际场景中使用。因此最好能有一份配置,既使用异步方式还能有效地避免数据丢失,即使出现producer崩溃的情况也不会有问题。

首先给出producer端“无消息丢失配置”,然后再分别解释每个参数配置的含义。具体配置参数列表如下。

  • block.on.buffer.full=true
  • acks=all or -1
  • retries=Integer.MAX_VALUE
  • max.in.flight.requests.per.connection=1
  • 使用带回调机制的send发送消息,即KafkaProducer.send(record, callback)
  • Callback逻辑中显式地立即关闭producer,使用close(0)
  • unclean.leader.election.enable=false
  • replication.factor=3
  • min.insync.replicas=2
  • replication.factor>min.insync.replicas
  • enable.auto.commit=false

1、producer端配置

block.on.buffer.full = true

实际上这个参数在Kafka0.9.0.0版本已经被标记为“deprecated”,并使用max.block.ms参数替代,但这里还是推荐用户显式地设置它为true,使得内存缓冲区被填满时producer处于阻塞状态并停止接收新的消息而不是抛出异常;否则producer生产速度过快会耗尽缓冲区。新版本Kafka(0.10.0.0之后)可以不用理会这个参数,转而设置max.block.ms即可。

acks = all

设置acks为all很容易理解,即必须要等到所有follower都响应了发送消息才能认为提交成功,这是producer端最强程度的持久化保证。

retries = Integer.MAX_VALUE

设置成MAX VALUE纵然有些极端,但其实想表达的是producer要开启无限重试。用户不必担心producer会重试那些肯定无法恢复的错误,当前producer只会重试那些可恢复的异常情况,所以放心地设置一个比较大的值通常能很好地保证消息不丢失。

使用带有回调机制的send

不要使用KafkaProducer中单参数的send方法,因为该send调用仅仅是把消息发出而不会理会消息发送的结果。如果消息发送失败,该方法不会得到任何通知,故可能造成数据的丢失。实际环境中一定要使用带回调机制的send版本,即KafkaProducer.send(record,.callback)。

Callback逻辑中显式立即关闭producer

在Callback的失败处理逻辑中显式调用KafkaProducer.close(0)。这样做的目的是为了处理
消息的乱序问题。若不使用close(0),默认情况下producer会被允许将未完成的消息发送出去,这样就有可能造成消息乱序。

2、broker端配置

unclean.leader.election.enable = false

关闭unclean leader选举,即不允许非ISR中的副本被选举为leader,从而避免broker端因日志水位截断而造成的消息丢失。

replication.factor >= 3

设置成3主要是参考了Hadoop及业界通用的三备份原则,其实这里想强调的是一定要使用多个副本来保存分区的消息。

min.insync.replicas > 1

用于控制某条消息至少被写入到ISR中的多少个副本才算成功,设置成大于1是为了提升producer端发送语义的持久性。记住只有在producer端acks被设置成all或-l时,这个参数才有意义。在实际使用时,不要使用默认值。

确保replication.factor > min.insync.replicas

若两者相等,那么只要有一个副本挂掉,分区就无法正常工作,虽然有很高的持久性但可用性被极大地降低了。推荐配置成replication.factor=min.insyn.replicas+l。


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文