若要让Spring Boot在启动时自动创建主题,可以在Spring容器中部署一个类型为NewTopic的Bean,Spring Boot将会自动为之创建对应的主题。
如果Kafka中已有同名的topic,该Bean将会被直接忽略。
发送消息很简单,Spring Boot可以将自动配置的KafkaTemplate注入任意组件,接下来该组件调用KafkaTemplate的send()方法即可发送消息。
send(String topic, V data): 发送不带key的消息。
send(String topic, K key, V data): 发送带key的消息
send(String topic, Integer partition, K key, V data): 发送到指定分区、的带key消息
send(String topic, Integer partition, Long timestamp, K key, V data): 发送到指定分区的、带key、带时间戳的消息
send(ProducerRecord
send(Message> message): 使用Message来发送消息
发送消息的方法的源码:
加了 final,这个kafkaTemplate还没有初始化,需要通过构造器进行依赖注入。
编写发送消息的方法
编写业务层,发送消息的业务逻辑
启动程序,可以看到程序帮我们连接到kafka的这些东西
这个是配置类,在启动web应用程序后,就会自动加载这个类,然后因为原先kafka集群里面没有test3这个主题,所以就帮我们创建了一个test3的主题
打开一个小黑窗,输入如下命令,然后监听这个test2主题,因为我们消息是发送到test2主题里面的
监听端口 9092 的kafka 服务器,主题是test2,然后打印出消息的创建时间,key,offset偏移量,分区。
kafka-console-consumer --bootstrap-server localhost:9092 ^ --topic test2 ^ --property print.timestamp=true ^ --property print.key=true ^ --property print.offset=true ^ --property print.partition=true
启动消息监听者
如图:发送一条不带key的消息,乱码是正常的,因为发送的消息的字符集是utf-8,这个小黑窗是gbk字符集,所以乱码。
成功通过web程序发送一下不带key的消息到kafka,并且能监听到
消息是字母就不会被乱码影响到
如果在配置文件定义了spring.kafka.producer.transaction-id-prefix属性,Spring Boot将会自动配置KafkaTransactionManager,并为KafkaTemplate自动应用该事务管理器。
如果在Spring容器中配置RecordMessageConverter,该Bean将会自动作为KafkaTemplate的消息转换器。
在默认情况下,其实无需配置自定义的消息转换器,可见即使不配置自定义的消息转换器,Spring Boot其实完全可以处理得很好。
只有当消息里面有一些key或者value是很特别的类型的时候,才需要用到这个 RecordMessageConverter 对象,一般来说都不会用到。或者我们在发送消息之前先把类型做一些转换处理。
@KafkaListener 注解修饰的方法会被注册为消息监听器方法。该注解支持如下常用的属性:
topics- 指定要监听的主题 topicPattern:指定要监听的主题的正则表达式 topicPartitions:指定要监听的主题的分区 groupId:指定组id
如果没有显式地通过 containerFactory 属性指定监听器容器工厂(KafkaListenerContainerFactory),
Spring Boot 会在容器中自动配置一个 ConcurrentKafkaListenerContainerFactory Bean 作为监听器容器工厂。
若要对 ConcurrentKafkaListenerContainerFactory 进行设置,可在 application.properties 配置文件中增加以 spring.kafka.listener.* 开头的配置属性。
例如如下配置:
# 设置监听器的确认模式 spring.kafka.listener.ack-mode=batch
如果在 Spring 容器中配置了 KafkaTransactionManager,它将会被自动关联到 Kafka 监听器容器工厂。
如果在 spring 容器中配置了 RecordFilterStrategy、ErrorHandler、AfterRollbackProcessor 或 ConsumerAwareRebalanceListener,它们也会被自动关联到监听器容器工厂。
使用如下属性可将监听器配置成处理单条消息的监听器:
spring.kafka.listener.type=single
Spring Boot会自动将容器中的RecordMessageConverter Bean关联到默认的监听器容器工厂。
设置如下属性将监听器配置成批处理的消息监听器:
spring.kafka.listener.type=batch
Spring Boot会自动将容器中的BatchMessageConverter Bean关联到默认的监听器容器工厂。
如果容器中只有一个RecordMessageConverter Bean(处理单条消息的转换器),且配置了批处理的消息监听器,那它就会被包装成BatchMessageConverter转换器。
【备注】:简而言之一句话,如果你显式配置了批量消息的转换器,那么该转换器将会被关联到监听器容器工厂。
如果你配置单条消息的转换器,但你又需要对消息进行批量监听,Spring Boot会自动将单条消息的转换器包装成批量的转换器。
以下配置对监听器的容器工厂生效
创建一个消息监听器的组件,项目启动的时候就会自动执行到这个监听器。
@Component 是 Spring 框架中的一个通用注解,用于标记一个类为组件,让 Spring 能够在应用程序启动时自动扫描并加载这些组件。
注意点:
如图:创建了两个监听消息的方法,都是监听着 test2 主题,
不同的是:方法1 属于 groupA 这个组,方法2 属于 groupB 这个组,因为是属于不同的组,所以功能是类似 pub/sub 这个发布订阅模型的,就是他们都订阅了相同的主题,就都能消费到test2主题的消息。
如果方法1和方法2 属于同一个组,那么这两个监听方法就是以轮询的方式监听着test2主题里面的消息。
如图:
方法1 属于 groupA 这个组,方法2 属于 groupB 这个组,因为是属于不同的组,所以功能是类似 pub/sub 这个发布订阅模型的,就是他们都订阅了相同的主题,就都能消费到test2主题的消息。
如图:我发送的消息,因为两个监听方法不是同一个组的,所以都能消费到该主题的消息。
之前是小黑窗输入命令监听,现在直接启动项目,在控制台看监听的消息。
发送不带key的消息,也是一样的。
如图:方法1和方法2 属于同一个组,那么这两个监听方法就是以轮询的方式监听着test2主题里面的消息。
如图:test2主题有四个分区,然后我们写了两个监听方法,意味着一个监听方法会被分配去监听2个分区的消息。
然后我们发送不带key的消息,那么这些消息会被随机分配到某个分区里面,然后被监听该分区的监听方法监听到。
如图可以看到,监听器A 在监听 分区3 的消息,消息i 和 消息k 因为都被分配到 分区3,所以都被 监听器A 监听到。
消息 j 被发送到分区0 ,被 监听器B 监听到,也就是表明 监听器B 被 Kafka 分配去监听分区0的消息
注意点:这里说的同一个组的2个监听器以轮询的方式监听同一个主题的消息,是kafka先给两个监听器分配它们各自负责监听哪个分区的消息,然后当我们把消息发送到某个分区时,那么负责监听该分区消息的监听器就会监听到该消息并做一些业务处理。
如图:如上面所说,这里可以看出,分区2也是监听器A在监听,而且同一个key的消息,都是发送到同一个分区里面的,所以这些消息都是被监听器A监听着。
发送不同的消息,但是key一样,也都是发送到同一个分区,而这个分区被监听器A监听着。所以无论发多少条消息,只要key是ljh,那么都是监听器A在监听。
test2 主题有4个分区,然后我们写了两个监听器,那么kafka 就会为这两个监听器分配它们去监听哪个分区的消息,以为分配是以轮询的方法分配的。所以刚好一个监听器被分配去监听2个分区。
(如果有5个分区,那么一般就是 监听器A被分配监听3个分区,监听器B被分配监听2个分区,这个分配分区的规则前面的文章有提到:分区分配规则)
因为同一个key的消息,无论发送多少条,一般都是发往同一个分区的,和上面说的同一个组的监听器,以轮询的方式监听同一个主题的消息时,两者并不冲突,需要结合具体的消息类型(这里说的是消息的key是否相同)来理解,
注意点:
通过springboot来整合kafka,演示的时候就需要来通过小黑窗输入命令来演示发送消息和消费消息了。
直接在web端发送消息,然后在IDEA控制台看监听到的消息就可以了
# 由于zookeeper 默认占用了8080端口,那么web应用的端口修改成8081 server.port=8081 # 指定连接 kafka 的 Broker 服务器的地址 spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094 # 发送请求是传递给服务器的ID,用于服务器端做日志 spring.kafka.client-id=ljh-boot # 生产者相关的配置 # 指定Kafka的消息确认机制 --> 0:不等待消息确认;1:只等待领导者分区的消息写入之后确认;all:等待所有分区的消息都写入之后才确认 spring.kafka.producer.acks=all # 指定消息发送失败的重试多少次 spring.kafka.producer.retries=0 # 当 Producer 内部类没有为 linger.ms 配置属性提供对应的字段时,可通过 properties 来设置 # springboot 没有为这个属性添加对应的字段,所以我们需要自己用properties来给其添加进去 spring.kafka.producer.properties[linger.ms]=3 # 设置序列化器 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 消费者相关的配置 # 自动提交offset,就是类似之前的自动消息确认 spring.kafka.consumer.enable-auto-commit=true # 多个消息之间,自动提交消息的时间间隔 # 当 Consumer 内部类没有为 auto.commit.interval.ms 这个配置属性提供对应的字段时,可通过 properties 来设置 # springboot 没有为这个属性添加对应的字段,所以我们需要自己用properties来给其添加进去 spring.kafka.consumer.properties[auto.commit.interval.ms]=1000 # 设置session的超时时长,默认是10秒,这里设置15秒 spring.kafka.consumer.properties[session.timeout.ms]=15000 # 设置每次都从最新的消息开始读取 spring.kafka.consumer.auto-offset-reset=latest # 设置序列化器 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 以下配置对监听器的容器工厂生效 # 配置消息如何提交offset spring.kafka.listener.ack-mode=batch # 指定消息监听器每次处理消息的数量(single:单条消息; batch:批量处理) spring.kafka.listener.type=single # 轮询的超时时间 spring.kafka.listener.poll-timeout=5s
该类的作用是用来自动创建主题
package cn.ljh.kafkaboot.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //kafka 配置类 // proxyBeanMethods = false:表示这个配置类不用帮我们做一些其他的代理操作,只用我自己写的bean就可以了 @Configuration(proxyBeanMethods = false) public class KafkaConfig { //用来自动创建主题 @Bean public NewTopic test3() { //参数1:主题名字 参数2:分区数量 参数3:复制因子数量 return new NewTopic("test3", 3, (short) 2); } }
发送消息
package cn.ljh.kafkaboot.controller; import cn.ljh.kafkaboot.service.MessageService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; /** * author JH */ @RestController public class MessageController { //业务层 public final MessageService messageService; //构造器依赖注入 public MessageController(MessageService messageService) { this.messageService = messageService; } //发送带key的消息 @GetMapping("/send/{key}/{value}") public String sendMessage(@PathVariable String key, @PathVariable String value){ //发送带key的消息成功 messageService.sendMessage(key,value); return "发送带key的消息成功"; } //发送不带key的消息 @GetMapping("/sendNoKey/{value}") public String sendNoKey(@PathVariable String value) { //发送不带key的消息成功 messageService.sendMessage(null,value); return "发送不带key的消息成功"; } }
发送消息的业务逻辑
package cn.ljh.kafkaboot.service; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.Objects; /** * author JH */ @Service public class MessageService { //定义一个主题的常量 public static final String TARGET_TOPIC = "test2"; //因为没有自己写KafkaTemplate这个类,所以是没有初始化的,需要进行依赖注入才行 private final KafkaTemplatekafkaTemplate; //通过构造函数进行依赖注入 public MessageService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } //发送消息 public void sendMessage(String key, String value) { //如果这个key不为空,则为true if(Objects.nonNull(key)) { //发送带 key、value的消息 kafkaTemplate.send(TARGET_TOPIC,key,value); } else { //发送不带 key 的消息 kafkaTemplate.send(TARGET_TOPIC,value); } } }
监听和消费消息
package cn.ljh.kafkaboot.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @KafkaListener 注解修饰的方法会被注册为消息监听器方法。该注解支持如下常用的属性: * topics- 指定要监听的主题 * topicPattern:指定要监听的主题的正则表达式 * topicPartitions:指定要监听的主题的分区 * groupId:指定组id */ // @Component是Spring框架中的一个通用注解,用于标记一个类为组件,让Spring能够在应用程序启动时自动扫描并加载这些组件 @Component public class KafkaMessageListener { //因为这两个监听器不属于同一个组,相当于pub/sub模型,都能监听到同一个主题的消息 //监听器方法A:监听 test2 主题,这个监听器是属于groupA这个组 @KafkaListener(topics = "test2",groupId = "groupA") public void processMsg01(ConsumerRecordmsg) { System.err.printf("监听器 【 A 】 收到消息, offset = %d , partition=%s , key = %s, value = %s%n", msg.offset(),msg.partition(), msg.key(), msg.value()); } //监听器方法B:监听 test2 主题,这个监听器是属于groupB这个组 @KafkaListener(topics = "test2",groupId = "groupB") public void processMsg02(ConsumerRecord msg) { System.err.printf("监听器 【 B 】 收到消息, offset = %d , partition=%s , key = %s, value = %s%n", msg.offset(),msg.partition(), msg.key(), msg.value()); } }
4.0.0 org.springframework.boot spring-boot-starter-parent 2.4.5 cn.ljh kafkaboot 1.0.0 kafkaboot 11 UTF-8 org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka org.springframework.boot spring-boot-devtools runtime true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka-test test org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok
1、启动 zookeeper 服务器端 小黑窗输入命令: zkServer 2、启动 Kafka 服务器: 第1个kafka服务器,也就是第1个节点:9092 kafka-server-start E:/install/kafka_2.13-3.6.1/config/server.properties 第2个kafka服务器,也就是第2个节点:9093 kafka-server-start E:/install/kafka_2.13-3.6.1/config/server-1.properties 第3个kafka服务器,也就是第3个节点:9094 kafka-server-start E:/install/kafka_2.13-3.6.1/config/server-2.properties 3、打开小黑窗,运行如下命令来启动CMAK: E:\cmak\bin\cmak.bat 4、打开CMAK图形界面 http://localhost:9000/
小黑窗启动效果如图:
CMAK 启动效果如图:
注意点,每次打开小黑窗,要重新运行命令来启动CMAK:
E:\cmak\bin\cmak.bat
需要把C盘的这个文件删除掉,重启启动生成才行,不然这个命令会运行失败