Kafa提供了如下最基本的可靠性保证:
同步副本需满足的条件:
broker级别配置:default.replication.factor=1
topic级别配置:replication.factor=1
建议非关键数据小于3
建议把broker分布在多个不同的机器上
unclean.leader.election.enable=false
指示是否启用非同步副本可以被选为首领,作为首领选举的最后手段,即使这样做可能会导致数据丢失
min.insync.replicas=1
最小同步副本数。min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数, 当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的 Follower 赶上并重新进入ISR, 因此能够容忍min.insync.replicas-1个副本同时宕机。当与min.insync.replicas和acks一起使用时,可以实现更大的耐用性保证。一个典型的场景是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用acks “all”进行生产。
replica.lag.time.max.ms=30000 (30 seconds)
如果一个follower这段时间内没有发送任何fetch请求,或者没有消费leader最新偏移量的消息,那么leader将从isr中删除该follower。
zookeeper.session.timeout.ms=18000 (18 seconds)
允许broker不向ZooKeeper发送心跳的时间间隔。如果超过这个时间不向ZK发送心跳,ZK会认为broker已经死亡,会将其移除出集群。
Kafka会在重启之前和关闭日志片段的时候将消息冲刷到磁盘上,或者等Linux系统页面缓存被填满时冲刷。拥有不同机架上的副本的多个磁盘比只写入首领磁盘更加安全。不过,也可以让broker更频繁的写入磁盘。
此设置允许指定一个间隔,在该间隔,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1,我们将在每条消息之后进行fsync;如果是5,我们将在每5条消息之后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。此设置可以在每个主题的基础上覆盖(请参阅每个主题配置部分)。
此设置允许指定一个时间间隔,在该时间间隔内,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1000,我们将在1000毫秒后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。
acks参数指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。允许以下设置:
acks=0。如果设置为零,则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,无法保证服务器已收到记录,重试配置也不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量将始终设置为-1。
acks=1。表示只要首领收到消息,并将记录成功写入其本地日志,就返回成功响应,不等待所有追随者的确认。在这种情况下,如果首领在确认成功后,追随者复制之前崩溃,则记录将会丢失。
acks=all。表示首领将等待同步复制集合中所有的副本都确认收到了记录。这保证了只要至少有一个同步复制副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于acks=-1的设置。
请注意,启用幂等性要求此配置值为“all”。如果设置了冲突的配置并且没有显式启用幂等性,则会禁用幂等性。
设置自动重试,并使用默认重试次数。
将delivery.timeout.ms设置成愿意等待的时长,生产者会在这段时间内重试。
例如:
group.id
auto.offset.reset
如果所有的处理逻辑都是在轮询里进行的,并且不需要维护轮询之间的状态(比如为了聚合数据),那么就很简单,可以使用自动提交,在轮询结束时提交偏移量。
enable.auto.commit
无法控制应用应用程序可能重复处理的消息的数量
如果应用程序把消息交给另一个后台线程处理,那么只能使用手动提交了
auto.commit.interval.ms
自动提交的频率过大,会增加重复的概率;过小,会增加额外开销
手动提交偏移量增加了灵活性,但也增加了复杂度并且有可能对性能产生影响,所以可能需要考虑如下事项:
如果所有的处理逻辑都是在轮询里进行的,就很简单,选择一个合适的提交频率;
如果涉及额外线程,该如何呢?
一定要在处理完后在提交偏移量
如何在分区被撤销之前提交偏移量;
如何在应用程序被分配到新分区并清理状态时提交偏移量
如果遇到批次中的部分消息需要稍后处理。因为消费者不能针对每一条消息提交偏移量,而是提交最后一条成功的偏移量,所以需要借助额外的工具来处理。
有两种模式来解决这个问题:
1.在提交偏移量的同时,状态存入另一个主题中,可以开启事务来保证一致性。当一个线程重新启动时,就可以读取状态和从偏移量处读取消息。
2. 使用流式处理框架
测试场景:
测试场景:
生产者:
消费者:
broker:
上一章主要讨论如何保证不丢失消息,但不能保证出现重复消息。
在一些简单的应用程序中,处理重复消息比较简单,消息都包含有唯一标识。
但在处理聚合事件的流式处理应用程序中,因为很难取判断结果的正确性,因为结果中可能包含了重复消息。在这种情况下,需要提供更强的保证,这种保证就是精确一次性处理语义。
如果一个操作被执行多次的结果与被执行一次相同,那么这个操作就是幂等的。
如果需要启用幂等生产者,需要在生产者端做如下配置:
其实这三个参数都已经是默认配置,无需显式添加。
如果启用了幂等生产者,那么每条消息都将包含生产者ID(PID)和序列号。所以一条消息的唯一标识由4个字段组成:
broke会用组合唯一标识来跟踪(保存在内存和硬盘中)写入每个分区的最后5条消息,重复的消息会被拒绝,记录错误指标(kafka.server:type=RequestMetrics,name=ErrorsPerSec)并返回错误。生产者会记录这个错误(record-error-rate),并反映在指标当中,但不抛出异常,也不触发告警。
乱序错误:broke收到不连续的序列号,将会返回乱序错误。
幂等生产者只能防止由生产者内部重试逻辑引起额消息重复。
如果同一个生产者调用多次producer.send()方法发送同一条消息,或多个生产者都发送同一条消息,Kafka中将产生多条消息。
Kafka的事务机制为了让流式处理应用程序生成正确的结果,要保证每个输入的消息都被精确处理一次,即使是在发生故障的情况下。
事务适用于流式处理应用程序的基础模式,即“消费-处理-生产”。
流式处理应用程序,处理过程包包含了聚合或连接操作,需要更新内部状态,事务对他们来说就会非常有用。
假设应用程序处理完一批消息并产生结果之后,在偏移量提交之前崩溃了。后来的其它实例就会重复处理这些数据,并生成结果,那么结果数据就会包含重复处理的数据。
假设一个应用程序正在处理消息的时候与Kafka暂时断开了连接,那么Kafka就会安排其它的实例继续处这批消息。这时候应用程序恢复运行了,它继续处理之前的消息批次,并写入结果。这样,结果就包含了重复处理的数据。
Kafka事务引入了原子多分区写入的概念,用来保证写入多个分区和提交偏移量是一个原子操作。
上图中,Producer A是事务性生产者,用来启动事务和执行原子多分区写入。需要给事务配置属性transactional.id并用initTransactions()方法初始化。
broker维护了producer.id和transactional.id的映射,如果使用同一个transactional.id再次调用initTransactions()方法,那么生产者分配到的producer.id和之前是一样的。
在调用initTransactions()方法初始化时,broker会为producer分配epoc,用来防止隔离僵尸应用。
用于事务传递的TransactionalId。这实现了跨越多个生产者会话的可靠性语义,因为它允许客户端保证使用相同TransactionalId的事务在开始任何新事务之前已经完成。如果没有提供TransactionalId,则生产者仅限于幂等传递。
如果配置了TransactionalId,则隐含enable.idempotence。默认情况下,未配置TransactionId,这意味着无法使用事务。请注意,默认情况下,交易需要至少三个broker的集群,这是建议的生产设置;为了进行开发,您可以通过调整broker设置transaction.state.log.replication.factor来更改这一点。
对于消费者,我们通过参数isolation.level来消费者的事务隔离级别,用啦控制消费者如何读取以事务方式写入的消息。
控制如何读取以事务方式写入的消息。如果设置为read_committed,consumer.poll()将只返回已提交的事务消息。如果设置为read_uncommitted(默认值),consumer.poll()将返回所有消息,甚至是已中止的事务消息。非事务性消息将在任一模式下无条件返回。
消息将始终以偏移顺序返回。因此,在read_committed模式下,consumer.poll()将只返回最后一个稳定偏移量(LSO, last stable offset)之前的消息,该偏移量小于第一个打开事务的偏移量。特别是,在相关交易完成之前,属于正在进行的交易的消息之后出现的任何消息都将被扣留。因此,当存在进行中事务时,read_committed消费者将无法读取高水位线。
此外,在read_committed中,seekToEnd方法将返回LSO。
在上图中,消费者B在事务提交之前,只能读取到message 2;但是消费者C则可以读取到所有消息。
Kafka事务无法实现精确一次性保证的几种场景:
比如处理数据时发送电子邮件,并不能保证只发送一次,而且无法撤销。
这种场景的流程是“消费-处理-数据库”,状态数据写入数据库。我们可以考虑在数据库中维护状态数据和偏移量,借助数据的事务来保证数据一致性。
还有一种场景是“消费-处理-主题-数据库”,状态数据既要写入主题,又要写入数据库。这个问题的一种常见的解决方案是“发件箱模式”。
应用将消息发送到一个Kafka主题(也就是发件箱),然后另外一个独立中继服务将会从Kafka读取消息并更新数据库,需要确保数据库更新是幂等的。
这个模式可以保证消息最终到达Kafka,主题消费者和数据库,要么都不到达。
这个模式的反模式是用数据库作为发件箱,然后另外一个独立中继服务将确保数据库更新也作为消息发送给Kafka。这种模式可以借助数据的内置约束(唯一索引,外键)保证精确一次性。
有一个为MM2增加精确一次性语义的改进提议:KIP665
前面讨论的精确一次性保证是针对“消费-处理-生产”的模式,而发布订阅在Kafka
中是“生产-消费-处理”的模式。针对这种模式,生产者可以开启事务,但是消费者只能通过设置隔离级别保证看不到已终止事务的消息,无法开启事务保证只消费一次消息。
package com.qupeng.demo.kafka.kafkaapache.transaction; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.*; public class KafkaTransaction{ private final static Logger logger = LoggerFactory.getLogger(KafkaTransaction.class); public KafkaProducer createKafkaProducer() { Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "your_broker_list"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("transactional.id", "your_transactional_id"); Producer producer = new KafkaProducer<>(producerProps); Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "your_broker_list"); consumerProps.put("group.id", "your_group_id"); consumerProps.put("enable.auto.commit", "false"); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("isolation.level", "read_committed"); Consumer consumer = new KafkaConsumer<>(consumerProps); producer.initTransactions(); consumer.subscribe(Arrays.asList("input_topic")); while (true) { ConsumerRecords records = null; try { records = consumer.poll(Duration.ofMillis(100)); if (records.count() > 0) { producer.beginTransaction(); for (ConsumerRecord record : records) { // 处理消息 String processedValue = processMessage(record.value()); // 发送处理后的消息到另一个主题 producer.send(new ProducerRecord<>("output_topic", record.key(), processedValue)); } Map offsetAndMetadataMap = consumerOffsets(records); producer.sendOffsetsToTransaction(offsetAndMetadataMap, consumer.groupMetadata()); producer.commitTransaction(); } } catch (WakeupException e) { // 关闭消费者 consumer.close(); throw new KafkaException(""); } catch (ProducerFencedException | InvalidProducerEpochException e) { // 程序已变为僵尸,只能退出 throw new KafkaException(String.format("The transactional.id %s is used by another process.", "your_transactional_id")); } catch (KafkaException e) { // 其它异常,中止事务,重置偏移量,并进行重试 producer.abortTransaction(); resetToLatestCommittedPositions(consumer, records); } finally { producer.close(); consumer.close(); } } } private void resetToLatestCommittedPositions(Consumer consumer, ConsumerRecords records) { for (TopicPartition partition : records.partitions()) { List > partitionRecords = records.records(partition); consumer.seek(partition, partitionRecords.get(0).offset()); } } private Map consumerOffsets(ConsumerRecords records) { Map offsetAndMetadataMap = new HashMap<>(); for (TopicPartition partition : records.partitions()) { List > partitionRecords = records.records(partition); offsetAndMetadataMap.put(partition, new OffsetAndMetadata(partitionRecords.get(partitionRecords.size() - 1).offset())); } return offsetAndMetadataMap; } private String processMessage(String value) { return "Message has been handled."; } }
Kafka事务的基本算法收到了Chandy-Lamport快照的启发,算法总体来讲就是:
由一个broker担当,通过选举产生,它也是事务日志主题分区的首领。职责是:
一个叫做__transaction_state的内部主题,用来记录事务日志,用作事务状态恢复。
一个特殊的Kafka消息,事务协调器用它来告诉所有分区提交各自的事务状态。
以下是Kafka事务处理流程概要: