在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。
它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。
在众多的消息队列中,Kafka 是一个非常出色的选择。
它能够处理大量的实时数据,并且提供了强大的持久化能力。
在本文中,我们将会探讨如何在 SpringBoot 中整合 Kafka。
Apache Kafka 是一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量、可扩展性和容错性而闻名。它是一个基于发布/订阅模式的消息系统,通常用于大型实时数据流处理应用。
Kafka 的主要组件包括:
Kafka 可以在分布式系统中用于构建实时流数据管道,它可以在系统或应用之间可靠地获取数据。此外,Kafka 可以和 Apache Storm、Apache Hadoop、Apache Spark 等进行集成,用于大数据处理和分析。
一个公司可能有很多服务器,每个服务器上运行着很多服务,Kafka 可以用来实现这些服务的日志收集功能。各服务的日志分别发送到 Kafka 的不同 Topic 中。
Kafka 能够作为一个大规模的消息处理系统,各生产者将消息发送到 Kafka,消费者从 Kafka 中读取消息进行处理。
Kafka 也常用于用户活动跟踪和实时分析。例如,用户的点击、搜索等行为可以实时写入到 Kafka,然后进行实时或者离线分析。
在 Kafka 上可以进行实时的流处理。例如,使用 Apache Storm 集成 Kafka 来进行实时的数据处理。
统计数据和监控数据也是 Kafka 的一个重要应用场景。例如,通过 Kafka 可以收集各种分布式应用的数据,然后进行统一的处理和分析。
Kafka 可以作为大规模事件处理的源头,例如,用户的行为、系统的状态等都可以作为事件,通过 Kafka 进行分发处理。
模块 | 版本 |
---|---|
SpringBoot | 3.1.0 |
JDK | 17 |
@Configuration @EnableKafka public class KafkaConfig { @Bean public KafkaReceiver listener() { return new KafkaReceiver(); } }
@Component @Slf4j public class KafkaSender { @Resource private KafkaTemplatekafkaTemplate; public void send(String topic, String key, String data) { //发送消息 CompletableFuture > completable = kafkaTemplate.send(topic, key, data); completable.whenCompleteAsync((result, ex) -> { if (null == ex) { log.info(topic + "生产者发送消息成功:" + result.toString()); } else { log.info(topic + "生产者发送消息失败:" + ex.getMessage()); } }); } }
@Component @Slf4j public class KafkaReceiver { /** * 下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用","隔开 */ @KafkaListener(topics = {"testTopic"}) public void receive(ConsumerRecord, ?> record){ log.info("消费者收到的消息key: " + record.key()); log.info("消费者收到的消息value: " + record.value().toString()); } }
/** * kafka 测试接口 */ @RestController public class KafkaController { @Autowired private KafkaSender kafkaSender; @GetMapping("/sendMessageToKafka") public String sendMessageToKafka() { MapmessageMap = new HashMap(); messageMap.put("message", "hello world!"); ObjectMapper objectMapper = new ObjectMapper(); String data = null; try { data = objectMapper.writeValueAsString(messageMap); } catch (JsonProcessingException e) { throw new RuntimeException(e); } String key = String.valueOf(UUID.randomUUID()); //kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null) kafkaSender.send("testTopic", key, data); return "ok"; } }
http://127.0.0.1:8080/sendMessageToKafka
Error connecting to node iZbp127a9vpra4v3kmkkmzZ:9092 (id: 0 rack: null)
修改本地物理机hosts文件。文件目录:C:\Windows\System32\drivers\etc
新增 xx.xx.xx.xx iZbp127a9vpra4v3kmkkmzZ
如果没生效,则需要重启系统
通过上述的步骤,我们已经成功地在 SpringBoot 中整合了 Kafka。
这使得我们的应用程序能够在不同的服务之间传递消息,而不需要担心消息的丢失。
我们也看到,通过使用 SpringBoot,我们可以非常轻松地完成这个过程。
希望这篇文章能够帮助你在自己的项目中更好地使用 Kafka。
如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+Kafka"即可获得
感谢您的支持和鼓励! 😊🙏
如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot, spring cloud等系列文章,一系列干货随时送达!