我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存
Kafka3.0 版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到 org.springframework.kafka.listener.SeekUtils
据我的实验,spring-kafka3.0版本通过application.yml 配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries 可以配置,我尝试过了,至少3.0版本是不行的,如果有人成功试过可以通过application.yml 配置消费者的消费的重试次数可以留言通知我,谢谢)
经过我不懈努力和尝试,只能通过Java代码配置的方式才可以,并且这种方式相对于application.yml配置更加灵活细致,上代码
public CommonErrorHandler commonErrorHandler() { BackOff backOff = new FixedBackOff(5000L, 3L); return new DefaultErrorHandler(backOff); }
然后把这个handler 添加到ConcurrentKafkaListenerContainerFactory中就行了
我们需要在创建DefaultErrorHandler类时加入一个ConsumerAwareRecordRecoverer参数就可以了,这样在重试3次后仍然失败就会保存到数据库中,注意这里save to db成功之后,我认为没有必要执行consumer.commitSync方法,首先这个consumer.commitSync这个方法默认是提交当前批次的最大的offset(可能会导致丢失消息),其次不提交Kafka的消费者仍然回去消费后面的消息,只要后面的消息,消费成功了,那么依然会提交offset,覆盖了这个offset
public CommonErrorHandler commonErrorHandler() { // 创建 FixedBackOff 对象 BackOff backOff = new FixedBackOff(5000L, 3L); DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> { log.info("save to db " + record.value().toString()); }, backOff); return defaultErrorHandler; }
如果你硬要提交也可以试试下面这种,指定提交当前的offset
public CommonErrorHandler commonErrorHandler() { // 创建 FixedBackOff 对象 BackOff backOff = new FixedBackOff(5000L, 3L); DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> { log.info("save to db " + record.value().toString()); Mapoffsets = new HashMap<>(); offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset())); consumer.commitSync(offsets); }, backOff); return defaultErrorHandler; }
仍然在创建DefaultErrorHandler类时加入一个DeadLetterPublishingRecoverer 类就行了,默认会把消息发到kafkaTemplate 配置的topic名字为your_topic+.DLT
@Autowired private KafkaTemplatekafkaTemplate; public CommonErrorHandler commonErrorHandler() { // 创建 FixedBackOff 对象 BackOff backOff = new FixedBackOff(5000L, 3L); DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff); return defaultErrorHandler; }
ConsumerRecordRecoverer 接口总共就这2种实现方式
kafka-consumer: bootstrapServers: 192.168.31.114:9092 groupId: goods-center #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance sessionTimeOut: 30000 autoOffsetReset: latest #取消自动提交,即便如此 spring会帮助我们自动提交 enableAutoCommit: false #自动提交间隔 autoCommitInterval: 1000 #拉取的最小字节 fetchMinSize: 1 #拉去最小字节的最大等待时间 fetchMaxWait: 500 maxPollRecords: 50 #300秒的提交间隔,如果程序大于300秒提交,会报错 maxPollInterval: 300000 #心跳间隔 heartbeatInterval: 10000 keyDeserializer: org.apache.kafka.common.serialization.LongDeserializer valueDeserializer: org.springframework.kafka.support.serializer.JsonDeserializer
package com.ychen.goodscenter.fafka; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration //指定配置文件的前缀 @ConfigurationProperties(prefix = "kafka-consumer") @Getter @Setter public class KafkaListenerProperties { private String groupId; private String sessionTimeOut; private String bootstrapServers; private String autoOffsetReset; private boolean enableAutoCommit; private String autoCommitInterval; private String fetchMinSize; private String fetchMaxWait; private String maxPollRecords; private String maxPollInterval; private String heartbeatInterval; private String keyDeserializer; private String valueDeserializer; }
package com.ychen.goodscenter.fafka; import com.alibaba.fastjson2.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.listener.*; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.FixedBackOff; import java.util.HashMap; import java.util.Map; @Configuration @EnableConfigurationProperties(KafkaListenerProperties.class) @Slf4j public class KafkaConsumerConfig { @Autowired private KafkaListenerProperties kafkaListenerProperties; @Autowired private KafkaTemplatekafkaTemplate; @Bean public KafkaListenerContainerFactory > kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // 并发数 多个微服务实例会均分 factory.setConcurrency(2); // factory.setBatchListener(true); factory.setCommonErrorHandler(commonErrorHandler()); ContainerProperties containerProperties = factory.getContainerProperties(); // 是否设置手动提交 containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } private ConsumerFactory consumerFactory() { Map consumerConfigs = consumerConfigs(); log.info("消费者的配置信息:{}", JSONObject.toJSONString(consumerConfigs)); return new DefaultKafkaConsumerFactory<>(consumerConfigs); } public CommonErrorHandler commonErrorHandler() { // 创建 FixedBackOff 对象 BackOff backOff = new FixedBackOff(5000L, 3L); DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), backOff); // DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler((ConsumerAwareRecordRecoverer) (record, consumer, exception) -> { // log.info("save to db " + record.value().toString()); // Map offsets = new HashMap<>(); // offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset())); // consumer.commitSync(offsets); // }, backOff); return defaultErrorHandler; } public Map consumerConfigs() { Map propsMap = new HashMap<>(); // 服务器地址 propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrapServers()); // 是否自动提交 propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerProperties.isEnableAutoCommit()); // 自动提交间隔 propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getAutoCommitInterval()); //会话时间 propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerProperties.getSessionTimeOut()); //key序列化 propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getKeyDeserializer()); //value序列化 propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getValueDeserializer()); // 心跳时间 propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getHeartbeatInterval()); // 分组id propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroupId()); //消费策略 propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerProperties.getAutoOffsetReset()); // poll记录数 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerProperties.getMaxPollRecords()); //poll时间 propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaListenerProperties.getMaxPollInterval()); propsMap.put("spring.json.trusted.packages", "com.ychen.**"); return propsMap; } }
package com.ychen.goodscenter.fafka; import com.ychen.goodscenter.service.OrderService; import com.ychen.goodscenter.vo.req.SubmitOrderReq; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; @Component @Slf4j public class MessageListener { @Autowired private OrderService orderService; @KafkaListener(topics = "order-message-topic", containerFactory = "kafkaListenerContainerFactory") public void processMessage(ConsumerRecordrecord, Acknowledgment acknowledgment) { log.info("order-message-topic message Listener, Thread ID: " + Thread.currentThread().getId()); try { log.info("order-message-topic message received, orderId: {}", record.value().getOrderId()); orderService.submitOrder(record.value()); // 同步提交 acknowledgment.acknowledge(); log.info("order-message-topic message acked: orderId: {}", record.value().getOrderId()); } catch (DuplicateKeyException dupe) { // 处理异常情况 log.error("order-message-topic message error DuplicateKeyException", dupe); // 重复数据,忽略掉,同步提交 acknowledgment.acknowledge(); } } }
上一篇:Springboot之自定义注解