springboot使用@KafkaListener监听多个kafka配置
作者:mmseoamin日期:2024-03-20

        背景: 使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用

        方案: 可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:

1. 导入依赖

        
			org.springframework.kafka
			spring-kafka
		

2. yml配置

kafka:
  # 默认消费者配置
  default-consumer:
    # 自动提交已消费offset
    enable-auto-commit: true
    # 自动提交间隔时间
    auto-commit-interval: 1000
    # 消费的超时时间
    poll-timeout: 1500
    # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量
    auto.offset.reset: latest
    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
    session.timeout.ms: 120000
    # 消费请求超时时间
    request.timeout.ms: 180000
  # 1号kafka配置
  test1:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx
  # 2号kafka配置
  test2:
    bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx
    consumer:
      group-id: xxx
      sasl.mechanism: xxxx
      security.protocol: xxxx
      sasl.jaas.config: xxxx

3. 容器工厂配置

package com.zhdx.modules.backstage.config;
import com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.Map;
/**
 * kafka监听容器工厂配置
 * 

* 拓展其他消费者配置只需配置指定的属性和bean即可 */ @EnableKafka @Configuration @RefreshScope public class KafkaListenerContainerFactoryConfig { /** * test1 kafka配置 */ @Value("${kafka.test1.bootstrap-servers}") private String test1KafkaServerUrls; @Value("${kafka.test1.consumer.group-id}") private String test1GroupId; @Value("${kafka.test1.consumer.sasl.mechanism}") private String test1SaslMechanism; @Value("${kafka.test1.consumer.security.protocol}") private String test1SecurityProtocol; @Value("${kafka.test1.consumer.sasl.jaas.config}") private String test1SaslJaasConfig; /** * test2 kafka配置 */ @Value("${kafka.test2.bootstrap-servers}") private String test2KafkaServerUrls; @Value("${kafka.test2.consumer.group-id}") private String test2GroupId; @Value("${kafka.test2.consumer.sasl.mechanism}") private String test2SaslMechanism; @Value("${kafka.test2.consumer.security.protocol}") private String test2SecurityProtocol; @Value("${kafka.test2.consumer.sasl.jaas.config}") private String test2SaslJaasConfig; /** * 默认消费者配置 */ @Value("${kafka.default-consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.default-consumer.poll-timeout}") private int pollTimeout; @Value("${kafka.default-consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.default-consumer.session.timeout.ms}") private int sessionTimeoutMs; @Value("${kafka.default-consumer.request.timeout.ms}") private int requestTimeoutMs; /** * test1消费者配置 */ public Map test1ConsumerConfigs() { Map props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism); props.put("security.protocol", test1SecurityProtocol); // 账号密码 props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig); return props; } /** * test2消费者配置 */ public Map test2ConsumerConfigs() { Map props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism); props.put("security.protocol", test2SecurityProtocol); // 账号密码 props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig); return props; } /** * 默认消费者配置 */ private Map getDefaultConsumerConfigs() { Map props = Maps.newHashMap(); // 自动提交(按周期)已消费offset 批量消费下设置false props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); // 消费请求超时时间 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); // 序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } /** * 消费者工厂类 */ public ConsumerFactory initConsumerFactory(Map consumerConfigs) { return new DefaultKafkaConsumerFactory<>(consumerConfigs); } public KafkaListenerContainerFactory> initKafkaListenerContainerFactory( Map consumerConfigs) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(initConsumerFactory(consumerConfigs)); // 是否开启批量消费 factory.setBatchListener(false); // 消费的超时时间 factory.getContainerProperties().setPollTimeout(pollTimeout); return factory; } /** * 创建test1 Kafka 监听容器工厂。 * * @return KafkaListenerContainerFactory> 返回的 KafkaListenerContainerFactory 对象 */ @Bean(name = "test1KafkaListenerContainerFactory") public KafkaListenerContainerFactory> test1KafkaListenerContainerFactory() { Map consumerConfigs = this.test1ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } /** * 创建test2 Kafka 监听容器工厂。 * * @return KafkaListenerContainerFactory> 返回的 KafkaListenerContainerFactory 对象 */ @Bean(name = "test2KafkaListenerContainerFactory") public KafkaListenerContainerFactory> test2KafkaListenerContainerFactory() { Map consumerConfigs = this.test2ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } }

4. @KafkaListener使用
package com.zhdx.modules.backstage.kafka;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
 * kafka监听器
 */
@Slf4j
@Component
public class test1KafkaListener {
    @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx")
    public void handleHyPm(ConsumerRecord record) {
        log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value()));
    }
}