当生产者将消息发送到Kafka集群后,会转发给消费者进行消费。消息的消费模型有两种,推送模式(push)和拉取模式(pull)。
消息的推送模式需要记录消费者的消费状态。当把一条消息推送给消费者后,需要维护消息的状态,如标记这条消息已经被消费,这种方式无法很好地保证消息被处理。如果要保证消息被处理,发送完消息后,需要将其状态设置为“已发送”。收到消费者的确认收到消息后,才将其状态更新为“已消费”,这就需要我们记录所有消息的消费状态。显然这种方式不可取。这种方式还存在一个明显的缺点,就是消息被标记为“已消费”后,其他消费者就不能再进行消费了。
由于推送模式存在一定的缺点,因此Kafka采用消费拉取的模式来消费消息。由每个消费者维护自己的消费状态,并且每个消费者互相独立地顺序拉取每个分区的消息。消费者通过偏移量的信息来控制从Kafka中消费的消息
由消费者通过偏移量进行消费控制的优点在于,消费者可以按照任意的顺序消费消息。例如,消费者可以通过重置偏移量信息,重新处理之前已经消费过的消息;或者直接跳转到某一个偏移量位置,并开始消费。
如果消费者已经将消息进行了消费,Kafka并不会立即将消息删除,而是会将所有消息进行保存,即持久化保存到Kafka的消息日志中。无论消息有没有被消费,用户可以通过设置保留时间来清理过期的消息数据。
由于消息的发送速率是由Kafka的Broker决定的,Broker的目标是尽可能以最快的速度传递消息。所以在推送模式下,很难适应消费速率不同的消费者,从而造成消费者来不及处理消息。消费者来不及处理消息就可能造成消息的阻塞,从而降低系统的处理能力。
在拉取模式下,用户可以根据消费者的处理能力调整消息消费的速率,但在这种模式下也存在一定的缺点。如果消息的生产者没有产生消息,就可能造成消费者陷入循环中,一直等待数据到达。为了避免这种情况出现,可以在拉取过程中指定允许消费者在等待数据到达时进行阻塞,并且还可以指定消费的字节数,从而保证传输时的数据量。
假设我们有一个应用程序,它从一个 Kafka 主题读取消息,在对这些消息做一些验证后再把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息、验证消息和保存结果。但过了一阵子,生产者向主题写入消息的速度超过了应用程序验证数据的速度,这时候该怎么办呢?如果只使用单个消费者来处理消息,那么应用程序会远远跟不上消息生成的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题写入消息一样,也可以让多个消费者从同一个主题读取消息。
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者负责读取这个主题的部分消息。
Kafka 消费者经常需要执行一些高延迟的操作,比如把数据写到数据库或用数据做一些比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,因此可以增加更多的消费者来分担负载,让每个消费者只处理部分分区的消息,这是横向扩展消费者的主要方式。于是,我们可以为主题创建大量的分区,当负载急剧增长时,可以加入更多的消费者。不过需要注意的是,不要让消费者的数量超过主题分区的数量,因为多余的消费者只会被闲置。
除了通过增加消费者数量来横向伸缩单个应用程序,我们还经常遇到多个应用程序从同一个主题读取数据的情况。实际上,Kafka 的一个主要设计目标是让 Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些应用场景中,我们希望每一个应用程序都能获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序都有自己的消费者群组就可以让它们获取到所有的消息。不同于传统的消息系统,横向伸缩消费者和消费者群组并不会导致 Kafka 性能下降。
消费者群组里的消费者共享主题分区的所有权。当一个新消费者加入群组时,它将开始读取一部分原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它将离开群组,原本由它读取的分区将由群组里的其他消费者读取。主题发生变化(比如管理员添加了新分区)会导致分区重分配。分区的所有权从一个消费者转移到另一个消费者的行为称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(你可以放心地添加或移除消费者)。不过,在正常情况下,我们并不希望发生再均衡。
根据消费者群组所使用的分区分配策略的不同,再均衡可以分为两种类型。
在进行主动再均衡期间,所有消费者都会停止读取消息,放弃分区所有权,重新加入消费者群组,并获得重新分配到的分区。这样会导致整个消费者群组在一个很短的时间窗口内不可用。这个时间窗口的长短取决于消费者群组的大小和几个配置参数。
协作再均衡(也称为增量再均衡)通常是指将一个消费者的部分分区重新分配给另一个消费者,其他消费者则继续读取没有被重新分配的分区。这种再均衡包含两个或多个阶段。在第一个阶段,消费者群组首领会通知所有消费者,它们将失去部分分区的所有权,然后消费者会停止读取这些分区,并放弃对它们的所有权。在第二个阶段,消费者群组首领会将这些没有所有权的分区分配给其他消费者。虽然这种增量再均衡可能需要进行几次迭代,直到达到稳定状态,但它避免了主动再均衡中出现的“停止世界”停顿。这对大型消费者群组来说尤为重要,因为它们的再均衡可能需要很长时间。
消费者会向被指定为群组协调器的 broker(不同消费者群组的协调器可能不同)发送心跳,以此来保持群组成员关系和对分区的所有权关系。心跳是由消费者的一个后台线程发送的,只要消费者能够以正常的时间间隔发送心跳,它就会被认为还“活着”。如果消费者在足够长的一段时间内没有发送心跳,那么它的会话就将超时,群组协调器会认为它已经“死亡”,进而触发再均衡。如果一个消费者发生崩溃并停止读取消息,那么群组协调器就会在几秒内收不到心跳,它会认为消费者已经“死亡”,进而触发再均衡。在这几秒时间里,“死掉”的消费者不会读取分区里的消息。在关闭消费者后,协调器会立即触发一次再均衡,尽量降低处理延迟。本章的后续部分将介绍一些用于控制心跳发送频率、会话过期时间和调节消费者行为的配置参数。
Kafka的Topic是由分区组成的,并且还可以配置分区的冗余度。一个分区在多个Broker中选举出一个Leader,消费者只访问这个Leader的分区副本。
消费者组订阅Topic,意味着该Topic下的所有分区都会被消费者组中的消费者消费,如果按照从属关系来说,Topic下的每个分区只属于消费者组中的一个消费者,不可能出现组中的两个消费者负责同一个分区。
Kafka通过配置消费者分区分配策略来决定分区中的消息被哪一个消费者消费。消费者分区的分配策略都应该实现org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor接口。通过实现这个接口,用户可以自定义分区分配策略。Kafka提供了3种实现的方式,可以通过参数partition.assignment.strategy进行指定。
默认的分区分配策略。这种分配策略是根据Kafka Consumer端的总数和Topic中的分区总数来获取一个范围的,然后将分区按照范围进行平均分配,以保证分区尽可能均匀地分配给所有消费者。
这种分区分配策略对应的partition.assignment.strategy参数值为:org.apache.kafka. clients.consumer.RoundRobinAssignor。这种方式将Consumer Group中的所有消费者及其订阅Topic的分区按照字典序列排序,然后通过轮询的方式逐个将分区分配给每个消费者。
这种分区分配策略采用黏性分配策略,该策略从Kafka 0.11版本引入。所谓黏性分配策略,既要保证分区的分配要尽可能均匀,又要保证每次分区的分配尽可能与上次分配的保持相同,就像进行粘贴一样。如果这两点发生冲突,优先考虑第一点,即分区的分配要尽可能均匀。
要从Kafka消息集群中读取消息,需要先创建一个KafkaConsumer对象。创建KafkaConsumer对象与创建KafkaProducer对象非常相似。一般只需要指定以下三个必要的参数。
该参数指定了Kafka集群的连接字符串,它的用途与在KafkaProducer中的用途是一样的。
该参数与生产者中的key.serializer参数含义类似。消费者从Kafka消息集群上获取的任何消息都是字节数组的格式,因此消息的每个组成部分都要执行相应的反序列化操作才能得到原来的对象格式。该参数将消息的key进行反序列化,其参数值必须实现org.apache.kafka.common.serialization.Deserializer接口。针对绝大多数基本数据类型,Kafka都提供了现成的反序列化器,例如,org.apache.kafka.common.serialization.StringDeserializer。该数据类型的主要作用是将接收到的字节数组转换为UTF-8的字符串。也可以通过实现Deserializer接口,自定义反序列化机制,但是需要与生产者端定义的序列化机制保持一致。
该参数与key.deserializer类似,用来将接收到的Kafka消息的消息体(即value)进行反序列化,从而得到KafkaProducer发送的原始数据。这里需要注意的是,key.deserializer和value.deserializer可以是不同的设置。
Kafka的消费者每次拉取服务器端的消息时,总是拉取由生产者写入Kafka但还没有被消费者处理过的数据。因此,需要一种机制来记录哪些消息是被消费者组里的哪个消费者消费过的。与其他消息系统不同的是,Kafka消费者每次拉取完消息后,会记录最新的偏移量地址。下次拉取消息的时候,将会从偏移量往后拉取最新的消息数据。我们把消费者更新到当前拉取分区中的位置(即偏移量)称为提交。
消费者需要定期提交拉取的偏移量,一方面用于记录最新消费的位置信息,以便下次的拉取操作;另一方面,当消费者退出或有新的消费者加入消费者组的时候,都会触发重平衡的操作,完成重平衡后,每个消费者可能会分配到新的分区,读取新分区中的数据。为了能够继续之前的拉取工作,消费者需要读取每个partition最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
情况一:如果提交的偏移量小于客户端处理的最后一个消息的偏移量,会导致两个偏移量之间的消息被重复处理
情况二:如果提交的偏移量大于客户端处理的最后一个消息的偏移量,会导致两个偏移量之间的消息丢失。
这是一种简单的提交方式,需要把参数enable.auto.commit设置为true,那么在默认情况下,每隔5秒消费者会自动把从poll()方法接收到的最大偏移量提交上去,这个时间间隔可以通过参数auto.commit.interval.ms进行修改。当然这个自动提交是在每次进行轮询时,即调用poll()方法时进行的。消费者会检查是否该提交偏移量了,如果已经提交,就会返回上次提交时的偏移量。
自动提交虽然方便,但是也可能存在一些问题,其中最主要的问题就是可能造成消息的重复消费。在前面的内容中介绍过,重平衡的发生会有不同的情况。其中的一种情况就是当提交的偏移量小于客户端处理的最后一个消息的偏移量,会造成消息的重复消费。下面我们来举例,按照默认的5秒,系统会自动提交一次。如果在最后一次提交之后的2秒发生了重平衡。那么重平衡完成后,消费者从最后一次提交的偏移量位置开始读取消息,这时偏移量已经落后了2秒,这样就会造成这2秒内的消息被重复消费处理。
这是手动提交偏移量,将enable.auto.commit设置成false,让应用程序决定何时提交偏移量,即使用commitSync()方法提交偏移量。这种方式非常简单,也很可靠。它可以减少在平衡时重复处理的消息数量,并同时消除丢失消息的可能性。需要注意的是,commitSync()方法将提交由poll()方法返回的最新偏移量,所以在处理完所有记录后要确保调用了commitSync()方法,否则还会有丢失消息的风险。
同步提交方式会造成应用程序一直阻塞,这样会限制应用程序的吞吐量。其中的一种解决办法就是降低提交频率;另一种方式可以使用异步提交API。消费者只需要发送提交偏移量的请求,而不需要等待服务器端的响应。
异步提交不会进行重试,只是根据服务器端的响应做出相应的动作。如果在得到服务器端返回响应之前,有另一个较大的偏移量信息被成功提交,就可能造成消息的重复消费。假设我们发出一个异步请求用于提交偏移量1000,但服务器端并没有收到这样的请求.。
既然消费者执行同步提交偏移量和异步提交偏移量的两种方式,我们就可以组合使用commitSync()方法和commitAsync()方法来提交偏移量信息。这样针对偶尔出现的提交失败,不必提交偏移量的重试也不会有太大问题。
在使用同步提交偏移量和异步提交偏移量时,可以在调用commitSync()方法和commitAsync()方法时,传入希望提交的partition和offset的map,即提交特定的偏移量。
Kafka消费者端的配置参数,除了bootstrap.servers、key.deserializer、value.deserializer三个必需参数以外,还有很多可选的参数。
该参数表示Kafka Broker集群的地址信息,其格式为ip1:port、ip2:port等,不需要设定全部的集群地址,设置两个或两个以上即可。
该参数表示消费者组名称,如果group.id相同则表示属于一个消费者组中的成员。如果没有指定该参数,会报出异常。
该参数用来配置Kafka消费者在一次拉取请求中能从Kafka中拉取的最小数据量,即调用poll()方法时,每次拉取的数据量,其默认值为1字节。
消费者在拉取数据时,如果Kafka服务器端返回给消费者的数据量小于这个参数值的设定,那么消费者就需要进行等待,直到数据量满足这个参数的配置大小。因此在实际运行环境中,可以适当调大这个参数的值以提高一定的吞吐量。另外,增大这个参数值也会造成额外的延迟,因此增大该参数不适合敏感的应用。
该参数与fetch.min.bytes参数对应,它用来配置Kafka消费者在一次拉取请求中从Kafka服务器端中拉取的最大数据量,其默认值为52 428 800字节,也就是50MB。
该参数并不是绝对的最大值。试想一下,如果该参数设置的值比任何一条由生产者写入Kafka服务器端中的消息字节数小,那么会不会造成无法消费呢?如果在第一个非空分区中拉取的第一条消息字节数大于该值,那么该消息仍然返回,以确保消费者继续工作。Kafka消息系统中,能够接收的最大消息的字节数是通过服务器端参数message.max.bytes进行设置的。
该参数也和fetch.min.bytes参数有关。前面提到,如果Kafka服务器端返回给消费者的数据量小于fetch.min.bytes参数值的设定,消费者就需要等待,直到数据量满足这个参数的配置大小。然而有可能会一直等待而无法将消息发送给消费者,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500ms。当Kafka满足不了fetch.min.bytes参数值的设定时,Kafka集群也会根据fetch.max.wait.ms参数值的设定,默认等待5s,然后将消息数据返回给消费者。综合来看,fetch.min.bytes和fetch.max.wait.ms都有可能造成消息的延迟处理。如果业务应用对延迟敏感,那么可以适当调小这些参数。
该参数用来配置Kafka消费者在一次拉取请求中拉取的最大消息数,其默认值为500条。如果消息数都比较小,则可以适当调大这个参数值来提升消费速度。
该参数用来配置从每个分区里返回给消费者的最大数据量,其默认值为1 048 576字节,即1MB。这个参数与fetch.max.bytes参数相似,只不过max.partition.fetch.bytes用来限制一次拉取中每个分区消息的字节数,而fetch.max.bytes用来限制一次拉取中整体消息的字节数。同样,如果这个参数设定的值比消息字节数小,那么也不会造成无法消费。
该参数用来指定在多长时间之后,关闭闲置的Kafka消费者连接,默认值是540 000ms,即9min。
该参数用来设置发送消息缓冲区(SO_SNDBUF)的大小,其默认值为131 072字节,即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
该参数用来配置Kafka消费者等待请求响应的最长时间,其默认值为40s。
该参数用来设置接收消息缓冲区(SO_RECBUF)的大小,其默认值为65 536字节,即64KB。如果将该参数设置为-1,则使用操作系统的默认值。
该参数用来配置元数据的过期时间,其默认值为300 000ms,即5min。如果元数据在此参数限定的时间范围内没有进行更新,即使没有任何分区变化或有新的Kafka Broker加入,也会被强制更新。
该参数用来配置Kafka消费者每次尝试重新连接指定主机之前应该等待的时间,避免频繁地连接主机,其默认值为50s。
该参数值为字符串类型,其有效值为以下三个。
当各分区下有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,从头开始消费。
当各分区下有已提交的偏移量时,从提交的偏移量开始消费;无提交的偏移量时,消费新产生的该分区下的数据。
Topic各分区都存在已提交的偏移量时,从偏移量后开始消费;只要有一个分区不存在已提交的偏移量,则抛出异常。
注意,除了以上三个有效值以外,设置其他任何值都会抛出错误。
该参数值为boolean类型,配置是否开启自动提交消费位移的功能,默认开启。
该参数只有当enable.auto.commit参数设置为true时才生效,表示开启自动提交偏移量功能时自动提交消费位移的时间间隔,其默认值为5s。
该参数表示消费者的分区分配策略,支持轮询策略设置和范围策略设置。
文章来源:《Kafka进阶》 作者:赵渝强
文章内容仅供学习交流,如有侵犯,联系删除哦!