背景: 使用springboot整合kafka时, springboot默认读取配置文件中 spring.kafka...配置初始化kafka, 使用@KafkaListener时指定topic即可, 当服务中需要监听多个kafka时, 需要配置多个kafka, 这种方式不适用
方案: 可以手动读取不同kafka配置信息, 创建不同的Kafka 监听容器工厂, 使用@KafkaListener时指定相应的容器工厂, 代码如下:
1. 导入依赖
org.springframework.kafka spring-kafka
2. yml配置
kafka: # 默认消费者配置 default-consumer: # 自动提交已消费offset enable-auto-commit: true # 自动提交间隔时间 auto-commit-interval: 1000 # 消费的超时时间 poll-timeout: 1500 # 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量 auto.offset.reset: latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) session.timeout.ms: 120000 # 消费请求超时时间 request.timeout.ms: 180000 # 1号kafka配置 test1: bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx consumer: group-id: xxx sasl.mechanism: xxxx security.protocol: xxxx sasl.jaas.config: xxxx # 2号kafka配置 test2: bootstrap-servers: xxxx:xxxx,xxxx:xxxx,xxxx:xxxx consumer: group-id: xxx sasl.mechanism: xxxx security.protocol: xxxx sasl.jaas.config: xxxx
3. 容器工厂配置
package com.zhdx.modules.backstage.config; import com.google.common.collect.Maps; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.Map; /** * kafka监听容器工厂配置 ** 拓展其他消费者配置只需配置指定的属性和bean即可 */ @EnableKafka @Configuration @RefreshScope public class KafkaListenerContainerFactoryConfig { /** * test1 kafka配置 */ @Value("${kafka.test1.bootstrap-servers}") private String test1KafkaServerUrls; @Value("${kafka.test1.consumer.group-id}") private String test1GroupId; @Value("${kafka.test1.consumer.sasl.mechanism}") private String test1SaslMechanism; @Value("${kafka.test1.consumer.security.protocol}") private String test1SecurityProtocol; @Value("${kafka.test1.consumer.sasl.jaas.config}") private String test1SaslJaasConfig; /** * test2 kafka配置 */ @Value("${kafka.test2.bootstrap-servers}") private String test2KafkaServerUrls; @Value("${kafka.test2.consumer.group-id}") private String test2GroupId; @Value("${kafka.test2.consumer.sasl.mechanism}") private String test2SaslMechanism; @Value("${kafka.test2.consumer.security.protocol}") private String test2SecurityProtocol; @Value("${kafka.test2.consumer.sasl.jaas.config}") private String test2SaslJaasConfig; /** * 默认消费者配置 */ @Value("${kafka.default-consumer.enable-auto-commit}") private boolean enableAutoCommit; @Value("${kafka.default-consumer.poll-timeout}") private int pollTimeout; @Value("${kafka.default-consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.default-consumer.session.timeout.ms}") private int sessionTimeoutMs; @Value("${kafka.default-consumer.request.timeout.ms}") private int requestTimeoutMs; /** * test1消费者配置 */ public Map
test1ConsumerConfigs() { Map props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test1KafkaServerUrls); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, test1GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test1SaslMechanism); props.put("security.protocol", test1SecurityProtocol); // 账号密码 props.put(SaslConfigs.SASL_JAAS_CONFIG, test1SaslJaasConfig); return props; } /** * test2消费者配置 */ public Map test2ConsumerConfigs() { Map props = getDefaultConsumerConfigs(); // broker server地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, test2KafkaServerUrls); // 消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, test2GroupId); // 加密 props.put(SaslConfigs.SASL_MECHANISM, test2SaslMechanism); props.put("security.protocol", test2SecurityProtocol); // 账号密码 props.put(SaslConfigs.SASL_JAAS_CONFIG, test2SaslJaasConfig); return props; } /** * 默认消费者配置 */ private Map getDefaultConsumerConfigs() { Map props = Maps.newHashMap(); // 自动提交(按周期)已消费offset 批量消费下设置false props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); // 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs); // 消费请求超时时间 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); // 序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 如果Kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除)自动将该偏移量重置成最新偏移量 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return props; } /** * 消费者工厂类 */ public ConsumerFactory initConsumerFactory(Map consumerConfigs) { return new DefaultKafkaConsumerFactory<>(consumerConfigs); } public KafkaListenerContainerFactory > initKafkaListenerContainerFactory( Map consumerConfigs) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(initConsumerFactory(consumerConfigs)); // 是否开启批量消费 factory.setBatchListener(false); // 消费的超时时间 factory.getContainerProperties().setPollTimeout(pollTimeout); return factory; } /** * 创建test1 Kafka 监听容器工厂。 * * @return KafkaListenerContainerFactory > 返回的 KafkaListenerContainerFactory 对象 */ @Bean(name = "test1KafkaListenerContainerFactory") public KafkaListenerContainerFactory > test1KafkaListenerContainerFactory() { Map consumerConfigs = this.test1ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } /** * 创建test2 Kafka 监听容器工厂。 * * @return KafkaListenerContainerFactory > 返回的 KafkaListenerContainerFactory 对象 */ @Bean(name = "test2KafkaListenerContainerFactory") public KafkaListenerContainerFactory > test2KafkaListenerContainerFactory() { Map consumerConfigs = this.test2ConsumerConfigs(); return initKafkaListenerContainerFactory(consumerConfigs); } }
4. @KafkaListener使用
package com.zhdx.modules.backstage.kafka; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka监听器 */ @Slf4j @Component public class test1KafkaListener { @KafkaListener(containerFactory = "test1KafkaListenerContainerFactory", topics = "xxx") public void handleHyPm(ConsumerRecordrecord) { log.info("消费到topic xxx消息:{}", JSON.toJSONString(record.value())); } }
上一篇:一张图搞懂微服务架构设计