Spring-Kafka如何实现批量消费消息并且不丢失数据
作者:mmseoamin日期:2023-12-14

Spring-Kafka如何实现批量消费消息并且不丢失数据

先给答案:

		// 批量消费配置: 1批量, 2手动提交
		factory.setBatchListener(true);
		factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
		// 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
		// 一次poll操作最大获取的记录数量
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
		// 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
		propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
		// 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
		// 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
		propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500
		// 在消费者方法中注入acknowledgment并在执行完业务逻辑后手动调用确认方法
		acknowledgment.acknowledge();

1、背景:

某个业务对象由多张表关联而成,要创建该对象需要向多张表插入数据,基于canal的监控就会有多次该对象的变更记录,而Kafka消费的时候也会多次处理同一个对象(虽然不同表,但是同一个对象的不同部分),原有的Kafka消费者是一次处理一条,这将造成重复对同一个对象的处理。其实只需要所有表插入完毕后,一次处理该对象即可。

2、现有技术架构:

mysql --> canal --> Kafka --> Spring-Kafka消费者 --> 下游接口

3、解决方案:

优化Spring-Kafka消费者,从单条处理改为多条处理,然后在消费者中合并相同的对象,从而达到一个对象只处理一次(最多两次)的目的。为什么有可能是两次,因为分批的时候很容易将同一个对象的多条消息分到上下两批,这样这个对象将会被处理两次;那为什么不会处理三次?其实也能够制造出来上次的情况,就是分批的大小如果很小就容易出现三次,甚至更多次。所以通常情况我们将分批大小设置得要比表数量多。

举个例子:一个对象的创建有10张表的插入消息,而分批大小设置成4,此时10条消息就被4这个批大小拆分成3批,每批处理一次该对象,那么该对象就会被处理3次。所以分批大小是一个重要的参数,其值的设定通常要大,但再大也不可避免被分成两批的情况。

4、实现步骤

Spring-kafka从1.1版本开始就支持了批量消费,需要在ContainerFactory中设置batchListener=true

同时设置消费者参数 max.poll.records 来控制一批的最大记录数量,该参数的缺省值为500。

AbstractKafkaListenerContainerFactory类的源码如下:

	/**
	 * Set to true if this endpoint should create a batch listener.
	 * @param batchListener true for a batch listener.
	 * @since 1.1
	 */
	public void setBatchListener(Boolean batchListener) {
		this.batchListener = batchListener;
	}

可以从javaDoc的@since得知该功能从1.1版本就存在了,所以只要Spring-Kafka的版本高于1.1的都支持批量消费功能。这个参数是搭配@KafkaListener来使用的。单条消费的写法如下:

@KafkaListener(id = "", groupId = "", topics = {})
public void listen(ConsumerRecord data) {
}

以上只能一次处理一条,并不能将多条消息统筹处理,所以使用以下的批量消费的写法:

@KafkaListener(id = "", groupId = "", topics = {}, containerFactory = "")
public void listen(List> datas) {
}

从以上代码可以看出,批量消费仅仅是将参数的ConsumerRecord类型改为List即可。

这样就行了吗?如果批次较大,对这一批数据的处理时间较长,就容易造成丢数据。场景如下:

1、开启自动提交offset,

2、提交offset的时间间隔为1s,

3、处理这一批数据耗时为2s。

还需要发生以下条件才能导致丢消息:

1、“自动提交offset的时间”到了且成功执行了offset的提交

2、此后几乎同时程序发生了严重的错误导致进程退出(注意此时消费者的代码逻辑并未执行完毕)

那么消息就会被丢失,因为Kafka收到了offset的提交,所以Kafka认为这一批消息已经处理成功了,但程序实际并未处理成功,等到下次启动程序,将从Kafka记录的offset开始消费,“记录的offset”就是发生异常退出前“提交的offset”。所以上次异常退出时刻的那一批消息就被丢失了,不会再被消费到。

举例:

假设消费者这一批消费到的消息offset编码列表为5,6,7。而自动提交offset时候会将7提交给Kafka,表示下一个消息将从8开始消费。但567这3条消息在消费者进程中还没来得及处理完毕就被意外终止了。等到人工处理错误重新启动程序,将从8开始消费,因为Kafka认为567已经处理过了,但实际567并没有成功处理,所以就会丢失567这一批的消息。

在进一步,如何防止消息丢失呢?答案是手动提交offset,同样Spring-Kafka已经提供了支持,其实Spring-Kafka只是对原生Kafka的包装,最核心的还是原生Kafka支持手动提交offset的能力。

上干货,Spring-Kafka中有一个类很有用:AcknowledgingMessageListener,这个类就是为了支持手动ack消息的,也即是手动提交offset,只是Spring将“手动提交offset”这个概念包装成了“确认消息”,里面有一个方法:

	/**
	 * Invoked with data from kafka.
	 * @param data the data to be processed.
	 * @param acknowledgment the acknowledgment.
	 */
	@Override
	void onMessage(ConsumerRecord data, Acknowledgment acknowledgment);

这个类是设计成去实现它从而获取手动提交offset的能力,但我们还可以简化,结合之前的@KafkaListener的方法,我们将Acknowledgment acknowledgment参数放在@KafkaListener的方法中,Spring就能够将Acknowledgment对象传递进来,从而我们可以自己控制何时“确认消息”。当然仅有这步还不够,还需要告诉Spring-Kafka我需要手动提交offset,通过一个简单的设置即可:

    @Bean(name = "batch_and_manual_ack_ContainerFactory")
    public KafkaListenerContainerFactory> batch_and_manual_ack_ContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 批量消费配置: 1批量, 2手动提交
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

其中最重要的一句是:.setAckMode(AckMode.MANUAL_IMMEDIATE);,该配置的缺省值是AckMode.BATCH。可以从ContainerProperties类的源码找到缺省值:

	/**
	 * The ack mode to use when auto ack (in the configuration properties) is false.
	 * 
    *
  • RECORD: Ack after each record has been passed to the listener.
  • *
  • BATCH: Ack after each batch of records received from the consumer has been * passed to the listener
  • *
  • TIME: Ack after this number of milliseconds; (should be greater than * {@code #setPollTimeout(long) pollTimeout}.
  • *
  • COUNT: Ack after at least this number of records have been received
  • *
  • MANUAL: Listener is responsible for acking - use a * {@link org.springframework.kafka.listener.AcknowledgingMessageListener}. *
*/ private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;

要想提高吞度量,须要设置一下几个参数:

  • max.poll.records

    一次poll操作最大获取的记录数量,缺省是500。该值越大,则吞吐量也越大,但要求消费者能够在不超时的情况下处理完所有的消息。

  • fetch.min.bytes

    一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 缺省是1B

  • fetch.max.wait.ms

    一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者,缺省是500。

    需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms

    5、所有代码:

        @Bean(name = "batch_and_manual_ack_ContainerFactory")
        public KafkaListenerContainerFactory> batch_and_manual_ack_ContainerFactory() {
            ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(concurrency);
            factory.getContainerProperties().setPollTimeout(1500);
            // 批量消费配置: 1批量, 2手动提交
            factory.setBatchListener(true);
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
            return factory;
        }
        public ConsumerFactory consumerFactory() {
            return new DefaultKafkaConsumerFactory(consumerConfigs());
        }
        public Map consumerConfigs() {
            Map propsMap = new HashMap();
            propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
            propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
            propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
            propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    		// 调大fetch的相关参数, 以便于提升吞吐量, 但会增大延时
    		// 一次poll操作最大获取的记录数量
            propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // max.poll.records, 缺省是500
    		// 一次fetch操作最小的字节数, 如果低于这个字节数, 就会等待, 直到超时后才返回给消费者. 这里给100kB, 缺省是1B
    		propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024 * 100); // fetch.min.bytes
    		// 一次fetch操作的最大等待时间,“最大等待时间”与“最小字节”任何一个先满足了就立即返回给消费者
    		// 需要注意:“最大等待时间”不能超过 session.timeout.ms 和 request.timeout.ms
    		propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000); // fetch.max.wait.ms, 缺省是500
            return propsMap;
        }
        
        @KafkaListener(groupId = "junit-test-group", containerFactory = "batch_and_manual_ack_ContainerFactory", topics = {"test"})
        public void test_batchConsume(List> datas, Acknowledgment acknowledgment) {
            System.out.println(new Date() + " datas = " + datas.size());
            System.out.println(new Date() + " collect = " + datas.stream().map(t -> t.offset()).collect(Collectors.toList()));
            // 最后一定要提交进度 (用于持久化进度到Kafka)
            acknowledgment.acknowledge();
        }
    

    将以上代码放在某个Spring的类里,然后修改配置即可使用