【万字长文】带你搞懂Kafka中的所有知识点
作者:mmseoamin日期:2024-02-03
目录
概述
主题和分区
日志
消息压缩
日志分段条件
日志清理
多副本
写入流程
生产者
必要参数配置
消息的发送
流程
元数据更新
重要的生产者参数
消费者
消费者组
分区分配策略
协调器
重平衡
触发方式
流程
如何避免rebalance
位移提交
消费者offset的存储
broker集群
控制器
事务
消息保障传输
幂等性
事务
概述
- Apache Kafka 是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)
- 消息系统
- kafka 和传统的消息系统(也称作消息中间件〉都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、 可恢复性等功能。与此同时, Kafka供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
- 存储系统
- Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险 也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可
- 流式处理平台
- Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作
- 整个 Kafka 体系结构中引入了以下3个术语。
- Producer 生产者:也就是发送消息的一方。生产者负责创建消息 然后将其投递到Kafka
- Consumer 消费者:也就是接收消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理
- Broker :服务代理节点。对于 Kafka 而言, Broker 可以简单地看作一个独立的 Kafka服务节点或 Kafka 服务实例。大多数情况下也可以将 Broker 看作一台 Kafka 服务器,前提是这台服务器上只部署了一个 Kafka 实例。一个或多个 Broker 组成了 一个 Kafka 集群。一般而言,我们更习惯使用首字母小写的 broker 来表示服务代理节点Kafka 中还有两个特别重要的概念
- 主题( Topic )与分区( Partition ): Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区( Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志( Log )文件,消息在被追加到分区日志、文件的时候都会分配一个特定的偏移量( offset )。 offset 是消息在分区中的唯一标识, Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说, Kafka保证的是分区有序而不是主题有序。
主题和分区
- Kafka 的消息通过主题进行分类。主题可以被分为若干个分区,一个分区对应多个日志文件。消息以追加的方式写入分区,然后以先入先出的顺序读取。要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能。
- 主题和分区是逻辑上的概念,同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志( Log )文件,消息在被追加到分区日志、文件的时候都会分配一个特定的偏移量( offset )。 offset 是消息在分区中的唯一标识, Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说, Kafka 保证的是分区有序而不是主题有序。
- 每一条消息被发送到 broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器I/O将会成为这个主题的性能瓶颈,而分区解决了这个问题创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。
日志
- Kafka中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每个主题又可以分为一个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配一个唯一的偏移量(offset)。
- 如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现水平扩展。不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka又引入了日志分段(LogSegment)的概念。
- 将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log 和LogSegment也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以“.txnindex”为后缀的事务索引文件)
- 向Log 中追加消息时是顺序写入的,只有最后一个 LogSegment 才能执行写入操作,在此之前所有的 LogSegment 都不能写入数据。为了方便描述,我们将最后一个LogSegment称为“activeSegment”,即表示当前活跃的日志分段。随着消息的不断写入,当activeSegment满足一定的条件时,就需要创建新的activeSegment,之后追加的消息将写入新的activeSegment。
- 为了便于消息的检索,每个LogSegment中的日志文件(以“.log”为文件后缀)都有对应的两个索引文件
- 偏移量索引文件(以".index”为文件后缀)和时间戳索引文件(以".timeindex”为文件后缀)。
- 每个 LogSegment 都有一个基准偏移量baseOffset,用来表示当前LogSegment中第一条消息的offsct。偏移量是一个64位的长整型数
- 日志文件和两个索引文件都是根据基准偏移量(baseOffset)命名的,名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件为00000000000000000000.log
-
消息压缩
- Kafka实现的压缩方式是将多条消息一起进行压缩,这样可以保证较好的压缩效果。在一般情况下,生产者发送的压缩数据在broker中也是保持压缩状态进行存储的,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息,这样保持了端到端的压缩。
- Kafka日志中使用哪种压缩方式是通过参数 compression.type来配置的,默认值为“producer”,表示保留生产者使用的压缩方式。这个参数还可以配置为“gzip”"snappy”“LZ4”,分别对应GZIP、SNAPPY、LZ4这3种压缩算法。如果参数compression.type配置为“uncompressed”,则表示不压缩。
-
日志分段条件
- 当前日志分段文件的大小超过了broker端参数log.segment. bytes 配置的值。log.segment.bytes参数的默认值为1GB.
- 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms或log.roll.hours参数配置的值。如果同时配置了log.roll.ms和log.roll.hours参数,那么log.roll.ms的优先级高。默认情况下,只配置了log.roll.hours参数,其值为168,即7天。
- 偏移量索引文件或时间戳索引文件的大小达到 broker端参数log.index.size.max.bytes配置的值。log.index.size.max. bytes 的默认值为10MB。
- 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。
-
日志清理
- 提供了日志删除、日志压缩两种清理策略,默认采用日志删除策略
- 日志删除
- 基于时间
- 日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments),retentionMs可以通过broker端参数log.retention.hours、log.retention.minutes和log.retention.ms来配置,其中 log.retention.ms的优先级最高,log.retention.minutes次之,log.retention.hours最低。默认情况下只配置了log.retention.hours参数,其值为168,故默认情况下日志分段文件的保留时间为7天。
- 基于大小
- 日志删除任务会检查当前日志的大小是否超过设定的阈值(retentionSize)来寻找可删除的日志分段的文件集合(deletableSegments),retentionSize可以通过 broker端参数 log.retention.bytes来配置,默认值为-1,表示无穷大。注意log.retention.bytes配置的是Log 中所有日志文件的总大小,而不是单个日志分段(确切地说应该为.log日志文件)的大小。单个日志分段的大小由 broker端参数log.segment.bytes来限制,默认值为1073741824,即 1GB。
- 基于起始偏移量
- 日志压缩
- Log Compaction对于有相同key的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启Kafka的日志清理功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。
多副本
- 通过增加副本数量可以提升容灾能力。同时通过多副本机制实现故障自动转移。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是一主多从的关系,其中 leader 副本负 处理读写请求 follower 副本只负责与 leader 副本的消息同步。副本处于不同的 broker ,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 本对外提供服务。 Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用
- leader副本
- 负责处理读写请求,生产者和消费者之和leader副本进行交互
- follower副本
- follower副本只负责与leader副本的消息同步
- AR(Assigned Replicas)
- 分区中的所有副本统称为 AR,AR=ISR+OSR
- ISR( In-Sync Replicas)
- 所有与 leader 副本保持一定程度同步的副本(包括 leader 副本在内)组成 ISR , ISR 集合是 AR 集合中的一个子集。消息会先发送到 leader副本,然后 follower副本才能从 leader 副本中拉取消息进行同步,同步期间内 follower 副本相对于 leader 副本而言会有 定程度的滞后,前面所说的一定程度的同步是指可忍受的滞后范围,这个范围可以通过参数进行配置。
- leader 副本负责维护和跟踪 ISR 集合中所有 follower 的滞后状态, follower副本落后或失效时, leader副本会把它从 ISR 集合中剔除 OSR 集合中有 follower 副本追上leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。追赶上leader副本的条件是此副本的 LEO 否不leader副本的HW。默认情况下,leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的 leader 而在 OSR 集合中的副没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)
- OSR(Out-of-Sync Replicas)
- leader副本同步滞后过多的副本(不包 leader 副本)组成 OSR 。 追赶上 leader 副本的定准则是此副本的 LEO 否不 eader 副本的 HW
- LEO
- 是Log End Offset 缩写,它标识当前日志文件中下一条待写入消息 offset ,LEO 的大小相当于当前日志分区中最后一条消息的offst值加1。分区 ISR 集合中的每个副本都会维护自身的 LEO ,而ISR集合中最小的LEO即为分区的HW ,对消费者而言只能消费 HW 之前的消息
- HW
- HW是High Watermark 的缩写,俗称高水位,它标识个特定的消息偏移量(offset),消费者只能拉取到这个offset前的消息
-
写入流程
- 生产者客户端发送消息至leader副本,消息被迫加到leader副本的本地日志,并且会更新日志的偏移量。
- 某一时刻,leader副本的LEO增加至5,并且所有副本的HW还都为0
- 之后follower副本(不带阴影的方框)向leader副本拉取消息,在拉取的请求中会带有自身的LEO信息,这个LEO信息对应的是 FetchRequest 请求中的 fetch_offset。
- leader副本返回给follower副本相应的消息,并且还带有自身的HW信息,对应的是 fetchResponse 中的 high_watermark
- 此时两个 follower 副本各自拉取到了消息,并更新各自的LEO。与此同时,follower副本还会更新自己的 HW ,更新HW的算法是比较当前LEO和leader副本中传送过来的HW,取较小值作为自己HW。当前两个follower副本HW都等于0
- 接下来follower副本再次请求拉取leader 副本中的消息,如图8-6所示。
- 此时leader副本收到来自follower副本的FetchRequest请求,其中带有LEO的相关信息,选取其中的最小值作为新的HW,即 min(15,3,4)=3。然后连同消息和HW一起返回FetchResponse给follower副本,如图8-7所示。注意leader副本的HW是一个很重要的东西,因为它直接影响了分区数据对消费者的可见性。
- 两个follower副本在收到新的消息之后更新LEO并且更新自己的HW为3( min(LEO,3)=3)。
- 在一个分区中,leader副本所在的节点会记录所有副本的LEO,而follower副本所在的节点只会记录自身的LEO,而不会记录其他副本的LEO。对HW而言,各个副本所在的节点都只记录它自身的HW。变更图8-3,使其带有相应的LEO和HW信息,如图8-8所示。leader副本中带有其他follower副本的LEO,那么它们是什么时候更新的呢? leader副本收到 follower副本的FetchRequest 请求之后,它首先会从自己的日志文件中读取数据,然后在返回给follower副本数据前先更新follower副本的LEO。
生产者
-
必要参数配置
- bootstrap.servers
- 该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1 , host2 :port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的 broker地址信息,当其中任意一个宕机时,生产者仍然可以连接到Kafka集群上。有关此参数的更多释义可以参考6.5.2节。
- key.serializer和value.serializer(see瑞额来ze儿)
- broker端接收的消息必须以字节数组(byte[])的形式存在。生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往broker之前需要知道如何将消息中对应的key和value做相应的序列化操作来转换成字节数组。使用此参数来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名。
-
消息的发送
- 在创建完生产者实例之后,接下来的工作就是构建消息,即创建ProducerRecord对象。
- 创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async) 。
- 发后即忘
- producer.send(record);
- 只管往 Kafka 发送 并不关心消息是否正确到达。
- 同步
- producer.send(record).get();
- 同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。
- 异步
- producer.send(record, ()->{/*...*/});
- 实际上 send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。
- 一般是在 send()方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。
- 这样可以获取一个RecordMetadata对象,在RecordMetadata对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些信息,则可以使用这个方式。
- 发送完成后要调用close()回收所占用的资源
- close()方法会阻塞等待之前所有发送请求完成后再关KafkaProducer 。与此同时,KafkaProducer还提供了一个带超时时间的 close() 方法
- 只会等待timeout时间内来完成所有尚未完成的请求处理,然后强行退出。
-
流程
- 整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender线程负责从RecordAccumulator 中获取消息并将其发送到Kafka中。
- 拦截器
- 生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
- KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照interceptor.classes参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。
- 序列化器
- 生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka 中收到的字节数组转换成相应的对象。
- 除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口。
- 生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如StringSerializer,而消费者使用了另一种序列化器,比如IntegerSerializer,那么是无法解析出想要的数据的。
- 分区器
- 消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
- 如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
- 在默认分区器 DefaultPartitioner 的实现中,如果 key不为 null,那么默认的分区器会对key进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值和分区数取余来计算分区号,拥有相同key的消息会被写入同一个分区。如果 key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
- 如果key不为null,那么计算得到的分区号会是所有分区中的任意一个。如果key为null,那么计算得到的分区号仅为可用分区中的任意一个,注意两者之间的差别。
- 在不改变主题分区数量的情况下,key 与分区之间的映射可以保持不变。一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。
- RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator缓存的大小可以通过生产者客户端参数buffer.memory配置,默认32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer 的send()方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms的配置,此参数的默认值为60000,即60秒。
- 主线程中发送过来的消息都会被追加到RecordAccumulator 的某个双端队列(Deque)中,在RecordAccumulator的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部。Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关,如果生产者客户端需要向很多分区发送消息,则可以将buffer .memory参数适当调大以增加整体的吞吐量。
- 消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka 生产者客户端中,通过java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进 BufferPool中,这个特定的大小由batch.size参数来指定,默认值为16384B,即 16KB。我们可以适当地调大batch.size参数以便多缓存一些消息。
- ProducerBatch的大小和batch.size参数也有着密切的关系。当一条消息(ProducerRecord>流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个ProducerBatch(如果没有则新建),查看ProducerBatch中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过 batch.size参数的大小,如果不超过,那么就以 batch.size参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool的管理来进行复用。如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
- Sender从RecordAccumulator中获取缓存的消息之后,会进一步将原本>的保存形式转变成的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的broker节点发送消息,而并不关心消息属于哪一个分区,而对于KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络IO层面的转换。
- 在转换成>的形式之后,Sender还会进一步封装成的形式,这样就可以将Request请求发往各个Node了,这里的Request 是指Kafka的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest,
- 请求在从 Sender 线程发往Kafka之前还会保存到InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map>,它的主要作用是缓存了已经发出去但还没有收到响应的请求(Nodeld是一个String 类型,表示节点的 id 编号)与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.flight.requests.per. connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
-
元数据更新
- 元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
- 当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms 的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode(NODE中负载最小的节点),然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和 final关键字来保障。
-
重要的生产者参数
- acks
- 这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。
- acks = 1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader 副本,比如在leader副本崩溃、重新选举新的leader副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。
- acks =0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks设置为О可以达到最大的吞吐量。
- acks =-1或acks = all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks设置为-1 (all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况。要获得更高的消息可靠性需要配合min.insync.replicas 等参数的联动,
- max.request.size
- 这个参数用来限制生产者客户端能发送的消息的最大值,默认1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。笔者并不建议读者盲目地增大这个参数的配置值,尤其是在对Kafka整体脉络没有足够把控的时候。因为这个参数还涉及一些其他参数的联动,比如 broker端的message.max.bytes参数,如果配置错误可能会引起一些不必要的异常。比如将broker端的message.max.bytes参数配置为10,而max.request.size参数配置为20,那么当我们发送一条大小为15B的消息时,生产者客户端就会报出如下的异常:
- buffer.memory
- 生产者内存缓冲区(RecordAccumulator)的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候 send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full/max.block.ms 参数
- retries
- 生产者可以重发消息的次数,默认为0,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔
- batch.size
- 指定了一个批次可以使用的内存大小默认16K,按照字节数计算(而不是消息个数)。当批次被填满, 批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大, 也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销
- linger.ms
- 指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数, 让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)
- max.in.flight.requests.per.connection
- 生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
- max.block.ms
- 调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常
- max.request.size
- request.timeout.ms
- metadata.fetch.timeout.ms
- 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器 返回响应的时间
- receive.buffer.bytes 和 send.buffer.bytes
- 指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1, 就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以 适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽
消费者
- 负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。
-
消费者组
- 与其他一些消息中间件不同的是,在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
- 消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(或减少)消费者的个数来提高(或降低)整体的消费能力。对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。
- 点对点模型
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
- 发布/订阅模型
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。
-
分区分配策略
- Range分配策略
- RangeAssignor分配策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每一个主题,RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
- 比如2个消费者,两个主题,每个主题有3个分区。那么就会变成3 / 2无法整除,就会变成一个消费者负责4个分区,另外一个负责2个分区
- c0: t0p0、t0p1、t1p0、t1p1
- c0: t0p2、t1p2
- RoundRobin分配策略
- RoundRobin分配策略的原理是将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。RoundRobin分配策略对应的 partition.assignment.strategy参数值为 org.apache.kafka.clients.consumer.RoundRobinAssignor。
- 如果同一个消费组内所有的消费者的订阅信息都是相同的,那么RoundRobinAssignor分配策略的分区分配会是均匀的。
- Sticky分配策略
- 分区的分配要尽可能均匀。
- 分区的分配尽可能与上次分配的保持相同。减少不必要的分区移动(
- 自定义分配策略
-
协调器
- 消费者协调器
- 更新元数据,加入/退出消费组,提交偏移量给GroupCoordinator,分区分配。不同消费者之间是看不到彼此的 ConsumerCoordinator 中的信息的。一个Consumer实例对应一个ConsumerCoordinator。
- 组协调器
- 每一个Consumer Group,都会有一个Coordinator节点(由某个Broker充当)负责消费组leader选举,处理提交偏移量请求,清理过期信息。
- 消费者协调器通过和组协调器发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询获取消息或提交偏移量时发送心跳。
- 如果消费者停止发送心跳的时间足够长,会话就会过期,组协调器认为它已经死亡,就会触发一次再均衡。
-
重平衡
-
触发方式
- 新的消费者加入
- 消费者协调器发送心跳超时
- 消费者退出
- 消费者订阅主题、分区发生变化
- 组协调器节点发生变化
-
流程
- 第一阶段:消费者查找要加入的消费者组的组协调器所在的borker。
- 第二阶段:所有消费者组成员需要重新入组,向组协调器发送请求(joinGroupRequest),携带自己的订阅信息和分区分配策略和一些元数据信息,收集到全员的joinGroup请求后,组协调器会从这些成员中选出担任leader的消费者(通常情况下第一个发送joinGroup的成员成为leader),选出leader后协调器会在给leader的响应(joinGroupResponse)体中携带组内成员的订阅信息和分配策略(只有leader的响应会携带,其他组员的响应体为空),leader接收到后会制定分配方案。
- 第三阶段:leader向组协调器发送syncGroup请求,将分配方案发送给协调者,其他成员也会向协调器发送syncGroup请求只不过没有任何内容,然后协调器统一以syncGroupResponse响应的形式通知所有组成员他们分配到的分区。
- 第四阶段:消费者重新拉取订阅分区的最新提交的位移偏移量,并开始消费。
-
如何避免rebalance
- 业务需要:(变更tpoic或partition)
- 合理设置消费者参数:
- 避免心跳超时引起的误判:
- session.timeout.ms: 一次session的连接超时时间
- heartbeat.interval.ms:心跳时间,一般为超时时间的1/3,Consumer在被判定为死亡之前,能够发送至少 3 轮的心跳请求
- 避免消费超时:
- max.poll.interval.ms: 每隔多长时间去拉取消息。合理设置预期值,否则就会被coordinator判定为死亡,踢出Consumer Group,进行Rebalance
- max.poll.records: 一次从拉取出来的数据条数。根据消费业务处理耗费时长合理设置,如果每次max.poll.interval.ms 设置的时间较短,可以max.poll.records设置小点儿,少拉取些,这样不会超时
- 尽量在设计时对业务后期发展及资源进行预估,减少后期因业务拓展引起的变更;
- 如果必须变更,在变更前评估影响,做出预案
- 反序列化
- KafkaProducer对应的序列化器,那么与此对应的KafkaConsumer就会有反序列化器。Kafka所提供的反序列化器有ByteBufferDeserializer、 ByteArrayDeserializer,BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于ByteBuffer、 ByteArray、Bytes、Double、Float、Integer、Long、Short 及 String类型的反序列化,这些序列化器也都实现了Deserializer接口。
- 消费数据
- Kafka中的消费是基于拉模式的。Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。
- 对于poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空,如果订阅的所有分区中都没有可供消费的消息,那么poll()方法返回为空的消息集合。
- 注意到 poll()方法里还有一个超时时间参数 time out ,用来控制 poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞 。
- poll()方法的返回值类型是ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了一个iterator()方法来循环遍历消息集内部的消息。
- 消费者消费到的每条消息的类型为ConsumerRecord,这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富。
- topic 和 partition这两个字段分别代表消息所属主题的名称和所在分区的编号。
- offset表示消息在所属分区的偏移量。
- timestamp表示时间戳,与此对应的timestampType表示时间戳的类型。timestampType 有两种类型:
- createTime和LogAppendTime,分别代表消息创建的时间戳和消息追加到日志的时间戳。
- headers表示消息的头部内容。
- key和value分别表示消息的键和消息的值,一般业务应用要读取的就是value,
- 我们在消费消息的时候可以直接对ConsumerRecord中感兴趣的字段进行具体的业务逻辑处理。
-
位移提交
- 消费者的offset信息,0.9.0版本以后,维护在kafka的_consumer_offsets这个topic中,_consumer_offsets初始并不存在,当第一个消费者消费数据后才会自动创建。
- key为group.id+topic+分区号,value为当前offset的值
- 每个group.id+topic+分区号就保留最新数据,因为每隔一段时间,Kafka内部会对这个topic进行compact
- 为什么消费者offeset不存在zk中?
- zk是个cp系统,性能较差
- offset提交频繁,需要频繁变更offset值(topic_partition_group = offset)
- 自动提交
- 在Kafka中默认的消费位移的提交方式是自动提交,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
- 手动提交
- 同步提交
- 使用 commitSync(),只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里
- 异步提交
- 使用 commitASync(),在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取 。异步提交以便消费者的性能得到一定的增强。
- 提交特定的偏移量
- 指定位移消费
- seek()方法中的参数partition表示分区,而offset参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次 poll()方法,等到分配到分区之后才可以重置消费位置。
- 拦截器
- KafkaConsumer 会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。
- KafkaConsumer 会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节,而使用拦截器的onCommit()方法却可以做到这一点。
-
消费者offset的存储
- 0.9.0版本以后,维护在kafka的_consumer_offsets这个topic中
- key为group.id+topic+分区号,value为当前offset的值
- 每个group.id+topic+分区号就保留最新数据,因为每隔一段时间,Kafka内部会对这个topic进行compact
- 为什么消费者offeset不存在zk中?
- zk是个cp系统,性能较差
- offset提交频繁,需要频繁变更新offset值(topic_partition_group = offset)
broker集群
-
控制器
- 在Kafka 集群中会有一个或多个broker,其中有一个broker会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。
- 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
- 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
- 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
- Broker 在启动时,会尝试去ZooKeeper中创建 /controller节点。Kafka当前选举控制器的规则是:第一个成功创建 /controller 节点的Broker会被指定为控制器。
- leader选举
- 按照AR集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定,并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。
- 如果ISR集合中没有可用的副本,那么此时还要再检查一下所配置的unclean.leader.election.enable参数(默认值为 false)。如果这个参数配置为true,那么表示允许从非ISR列表中的选举leader,从AR列表中找到第一个存活的副本即为leader。
- 可以使用kafka-server-stop.sh脚本优雅关闭kafka
- borker.id
- broker.id是broker在启动之前必须设定的参数之一,在Kafka集群中,每个broker都有唯一的 id (也可以记作 brokerId)值用来区分彼此。broker在启动时会在ZooKeeper中的/brokers/ids路径下创建一个以当前brokerId为名称的虚节点,broker的健康状态检查就依赖于此虚节点。当broker下线时,该虚节点会自动删除,其他broker节点或客户端通过判断/brokers/ids路径下是否有此 broker 的 brokerld 节点来确定该broker 的健康状态。
- 可以通过broker端的配置文件 config/server.properties 里的 broker.id 参数来配置brokerId,默认情况下broker.id值为-1。在Kafka中,brokerld值必须大于等于0才有可能正常启动,但这里并不是只能通过配置文件config/server.properties来设定这个值,还可以通过meta.properties 文件或自动生成功能来实现。
事务
-
消息保障传输
- 最多一次(at most once)
- 至少一次(at least once)&使用场景(Kafka 默认实现方式)
- 因为ISR机制,Kafka消息一旦提交成功(产生副本之后),这条消息近乎可以认为是不可能丢失的,所以至少一次被消费。
- ack=1或-1,生产者设置重试
- 精准一次(exactly once)&使用场景(Kafka 支持机制)
- 精确一次指的是消息被处理且只会被处理一次。在0.11.0.0 之后支持事务和幂等性之后,使用较广的就变成精确一次了。
-
幂等性
- 首先幂等性的概念:若一个操作执行n次(n>1)与执行一次的结果是相同的,那么这个操作就是幂等操作。
- 在producer端,当出现发送消息无响应或者响应超时之后,不管消息成功没,都会有一个重试策略,这就导致了消息的重复提交问题。
- Kafka提供了一个enable.idempotent参数,设置为true时,就开启幂等了。
- 幂等的实现方式是给所提交的消息都赋予一个单调递增的序列号用于消息去重,这个序列号不会舍弃,始终随消息持久化保存,可以简单的理解为消息的一部分。这么做的目的是防止leader副本挂掉之后,没法进行去重操作。并且每个生产者初始化时都会分配一个pid(严格单调递增的),broker端会在内存中为每一对<PID ,分区>维护一个序列号,生产者每发送一条消息就会将<PID,分区>对应的序列号的值加1。
- 当发送消息的序列号小于或者等于broker端保存的序列号时,broker就会拒绝这条消息。 当实现上述的Idempotent producer就保证了消息可以重试n次直到提交成功,并且提交多次也仅会成功保存一次,进而从producer端保证了,消息只会被成功提交一次。
-
事务
- 幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷 。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能
- kafka事务使用前必须开启幂等性
- transactionalld 与PID一一对应,两者之间所不同的是transactionalld由用户显式设置,而PID是由Kafka内部分配的。另外,为了保证新的生产者启动后具有相同transactionalld 的旧生产者能够立即失效,每个生产者通过transactionalld获取PID的同时,还会获取一个单调递增的producer epoch (对应下面要讲述的
- 为了实现事务的功能,Kafka还引入了事务协调器(TransactionCoordinator)来负责处理事务,这一点可以类比一下组协调器(GroupCoordinator)。每一个生产者都会被指派一个特定的TransactionCoordinator,所有的事务逻辑包括分派PID等都是由TransactionCoordinator 来负责实施的。TransactionCoordinator 会将事务状态持久化到内部主题_transaction_state 中。
- 备注
- transactionalId 与 PID 一一对应,为了保证新的 Producer 启动之后,具有相同的 transactionalId 的旧生产者立即失效,每个 Producer 通过 transactionalId 获取 PID 的时候,还会获取一个单调递增的 producer epoch。
- Kafka 的事务主要是针对 Producer 而言的。对于 Consumer,考虑到日志压缩(相同 Key 的日志被新消息覆盖)、可追溯的 seek() 等原因,Consumer 关于事务语义较弱。
- 对于 Kafka Consumer,在实现事务配置时,一定要关闭自动提交的选项。
- 流程
- 查找事务协调器
- 为生产者分配PID(幂等性)
- 不包含 transactionId:直接生成一个新的 Producer ID,返回给生产者客户端
- 包含 transactionId:根据 transactionId 获取 PID,这个对应关系保存在事务日志中
- 消费-转换-生产
- 存储对应关系,通过请求增加分区
- Producer 在向新分区发送数据之前,首先向 TransactionalCoordinator 发送请求,使 TransactionalCoordinator 存储对应关系 (transactionalId, TopicPartition) 到主题 __transaction_state 中。
- 生产者发送消息
- 基本与普通的发送消息相同,生产者调用 producer.send() 方法,发送数据到分区;
- 发送的请求中,包含 pid, epoch, sequence number 字段;
- 增加消费 offset 到事务
- 生产者通过 producer.senOffsetsToTransaction() 接口,发送分区的 Offset 信息到事务协调者,协调者将分区信息增加到事务中;
- 事务提交位移
- 在前面生产者调用事务提交 offset 接口后,会发送一个 TxnOffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。协调者会根据请求的 pid 与 epoch 验证生产者是否允许发起这个请求。
- epoch:生产者用于标识同一个事务 ID 在一次事务中的轮数,每次初始化事务的时候,都会递增,从而让服务端知道生产者请求是否为旧的请求。
- 只有当事务提交之后,offset 才会对外可见。
- 提交或回滚事务
- 用户调用 producer.commitTransaction() 或 abortTransaction() 方法,提交或回滚事务;
- EndTxnRequest:生产者完成事务之后,客户端需要显式调用结束事务,或者回滚事务。前者使消息对消费者可见,后者使消息标记为 abort 状态,对消费者不可见。无论提交或者回滚,都会发送一个 EndTxnRequest 请求到事务协调者,同时写入 PREPARE_COMMIT 或者 PREPARE_ABORT 信息到事务记录日志中。
- WriteTxnMarkerRequest:事务协调者收到 EndTxnRequest 之后,其中包含消息是否对消费者可见的信息,然后就需要向事务中各分区的 Leader 发送消息,告知消费者当前消息时哪个事务,该消息应该接受还是丢弃。每个 Broker 收到请求之后,会把该消息应该 commit 或 abort 的信息写到数据日志中。
- 开启事务
- 生产者通过方法 producer.beginTransaction() 启动事务,此时只是生产者内部状态记录为事务开始。对于事务协调者,直到生产者发送第一条消息,才认为已经发起了事务。