相关推荐recommended
微服务同时接入多个Kafka
作者:mmseoamin日期:2024-02-06

最近在做微服务的迁移改造工作,其中有一个服务需要订阅多个Kafka,如果使用spring kafka自动配置的话只能配置一个Kafka,不符合需求,该文总结了如何配置多个Kafka,希望对您有帮助。

文章目录

  • 准备工作
  • 最小化配置Kafka
  • 多Kafka配置

    准备工作

    • 自己搭建一个Kafka

      从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x。

      https://kafka.apache.org/downloads

      微服务同时接入多个Kafka,在这里插入图片描述,第1张

    • 解压安装

      进入bin目录,执行如下命令,按照如下顺序启动

      Linux

      # 配置文件选择自己对应的目录
      zookeeper-server-start.sh ../config/zookeeper.properties
      

      Windows

      windows/zookeeper-server-start.bat ../config/zookeeper.properties
      

      打开另外一个终端,启动KafkaServer

      Linux

      kafka-server-start.sh ../config/server.properties
      

      Windows

      windows/kafka-server-start.bat ../config/server.properties
      

      最小化配置Kafka

      如下是最小化配置Kafka

      pom.xml 引入依赖

      
      	org.springframework.kafka
      	spring-kafka
      
      

      application.properties

      server.port=8090
      spring.application.name=single-kafka-server
      #kafka 服务器地址
      spring.kafka.bootstrap-servers=localhost:9092
      #消费者分组,配置后,自动创建
      spring.kafka.consumer.group-id=default_group
      

      KafkaProducer 生产者

      @Slf4j
      @Component
      @EnableScheduling
      public class KafkaProducer {
          @Resource
          private KafkaTemplate kafkaTemplate;
          private void sendTest() {
          	//topic 会自动创建
              kafkaTemplate.send("topic1", "hello kafka");
          }
          @Scheduled(fixedRate = 1000 * 10)
          public void testKafka() {
              log.info("send message...");
              sendTest();
          }
      }
      

      KafkaConsumer 消费者

      @Slf4j
      @Component
      public class KafkaConsumer {
          @KafkaListener(topics = {"topic1"})
          public void processMessage(String spuId) {
              log.warn("process spuId ={}", spuId);
          }
      }
      

      运行效果:

      微服务同时接入多个Kafka,在这里插入图片描述,第2张

      多Kafka配置

      配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖

      pom.xml

      
      	org.springframework.kafka
      	spring-kafka
      
      

      application.properties

      server.port=8090
      spring.application.name=kafka-server
      #kafka1
      #服务器地址
      spring.kafka.one.bootstrap-servers=localhost:9092
      spring.kafka.one.consumer.group-id=default_group
      #kafka2
      spring.kafka.two.bootstrap-servers=localhost:9092
      spring.kafka.two.consumer.group-id=default_group2
      

      第一个Kafka配置,需要区分各Bean的名称

      KafkaOneConfig

      @Configuration
      public class KafkaOneConfig {
          @Value("${spring.kafka.one.bootstrap-servers}")
          private String bootstrapServers;
          @Value("${spring.kafka.one.consumer.group-id}")
          private String groupId;
          @Bean
          public KafkaTemplate kafkaOneTemplate() {
              return new KafkaTemplate<>(producerFactory());
          }
          @Bean(name = "kafkaOneContainerFactory")
          KafkaListenerContainerFactory> kafkaOneContainerFactory() {
              ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactory());
              factory.getContainerProperties().setPollTimeout(3000);
              return factory;
          }
          private ProducerFactory producerFactory() {
              return new DefaultKafkaProducerFactory<>(producerConfigs());
          }
          private ConsumerFactory consumerFactory() {
              return new DefaultKafkaConsumerFactory<>(consumerConfigs());
          }
          private Map producerConfigs() {
              Map props = new HashMap<>();
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              return props;
          }
          private Map consumerConfigs() {
              Map props = new HashMap<>();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              return props;
          }
      }
      

      kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息

      kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,

      producerFactory 生产者工厂

      consumerFactory 消费者工厂

      producerConfigs 生产者配置

      consumerConfigs 消费者配置

      同样创建第二个Kafka,配置含义,同第一个Kafka

      KafkaTwoConfig

      @Configuration
      public class KafkaTwoConfig {
          @Value("${spring.kafka.two.bootstrap-servers}")
          private String bootstrapServers;
          @Value("${spring.kafka.two.consumer.group-id}")
          private String groupId;
          @Bean
          public KafkaTemplate kafkaTwoTemplate() {
              return new KafkaTemplate<>(producerFactory());
          }
          @Bean(name = "kafkaTwoContainerFactory")
          KafkaListenerContainerFactory> kafkaTwoContainerFactory() {
              ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactory());
              factory.getContainerProperties().setPollTimeout(3000);
              return factory;
          }
          private ProducerFactory producerFactory() {
              return new DefaultKafkaProducerFactory<>(producerConfigs());
          }
          public ConsumerFactory consumerFactory() {
              return new DefaultKafkaConsumerFactory<>(consumerConfigs());
          }
          private Map producerConfigs() {
              Map props = new HashMap<>();
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
              return props;
          }
          private Map consumerConfigs() {
              Map props = new HashMap<>();
              props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              return props;
          }
      }
      

      创建一个测试的消费者,注意配置不同的监听容器containerFactory

      KafkaConsumer

      @Slf4j
      @Component
      public class KafkaConsumer {
          @KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory")
          public void oneProcessItemcenterSpuMessage(String spuId) {
              log.warn("one process spuId ={}", spuId);
          }
          @KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory")
          public void twoProcessItemcenterSpuMessage(String spuId) {
              log.warn("two process spuId ={}", spuId);
          }
      }
      

      创建一个测试的生产者,定时往两个topic中发送消息

      KafkaProducer

      @Slf4j
      @Component
      public class KafkaProducer {
          @Resource
          private KafkaTemplate kafkaOneTemplate;
          @Resource
          private KafkaTemplate kafkaTwoTemplate;
          private void sendTest() {
              kafkaOneTemplate.send("topic1", "hello kafka one");
              kafkaTwoTemplate.send("topic2", "hello kafka two");
          }
          @Scheduled(fixedRate = 1000 * 10)
          public void testKafka() {
              log.info("send message...");
              sendTest();
          }
      }
      

      最后运行效果:

      微服务同时接入多个Kafka,在这里插入图片描述,第3张

      其他kafka文章:

      【从面试题看源码】-看完Kafka性能优化-让你吊打面试官