Kafka调试
作者:mmseoamin日期:2024-02-04

Kafka安装配置

安装

kafka官网https://kafka.apache.org/downloads

wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz

tar xf kafka_2.12-2.7.2.tgz -C /usr/local/   #将kafka安装到了/usr/local目录下

mv /usr/local/kafka_2.12-2.7.2 /usr/local/kafka  #新建存放日志和数据的文件夹

mkdir /usr/local/kafka/logs

配置

afka的主配置文件为/usr/local/kafka/config/server.properties,这里以节点kafkazk1为例,重点介绍一些常用配置项的含义:

broker.id=1 
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=10.0.0.6 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
listeners=PLAINTEXT://10.0.0.6:9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/usr/local/kafka/logs 
#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
num.partitions=6 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1 
offsets.topic.replication.factor=1 
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
message.max.byte=5242880  #消息保存的最大值5M
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
zookeeper.connect=localhost:2181 #不是集群,所以可以写成localhost 
#zookeeper.connect=10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181  #集群
zookeeper.connection.timeout.ms=18000 
group.initial.rebalance.delay.ms=0 auto.create.topics.enable=true 
delete.topic.enable=true

每个配置项含义如下:

  • broker.id:每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。
  • listeners:设置kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址,这里将监听地址设置为IP地址。
  • log.dirs:这个参数用于配置kafka保存数据的位置,kafka中所有的消息都会存在这个目录下。可以通过逗号来指定多个路径, kafka会根据最少被使用的原则选择目录分配新的parition。需要注意的是,kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的 parition的个数多小而定。
  • num.partitions:这个参数用于设置新创建的topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能。这里配置6个。
  • log.retention.hours:这个参数用于配置kafka中消息保存的时间,还支持log.retention.minutes和 log.retention.ms配置项。这三个参数都会控制删除过期数据的时间,推荐使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。
  • log.segment.bytes:配置partition中每个segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的segment file。
  • zookeeper.connect:这个参数用于指定zookeeper所在的地址,它存储了broker的元信息。 这个值可以通过逗号设置多个值,每个值的格式均为:hostname:port/path,每个部分的含义如下:
    • hostname:表示zookeeper服务器的主机名或者IP地址,这里设置为IP地址。
    • port: 表示是zookeeper服务器监听连接的端口号。
    • /path:表示kafka在zookeeper上的根目录。如果不设置,会使用根目录。
  • auto.create.topics.enable:这个参数用于设置是否自动创建topic,如果请求一个topic时发现还没有创建, kafka会在broker上自动创建一个topic,如果需要严格的控制topic的创建,那么可以设置auto.create.topics.enable为false,禁止自动创建topic。
  • delete.topic.enable:在0.8.2版本之后,Kafka提供了删除topic的功能,但是默认并不会直接将topic数据物理删除。如果要从物理上删除(即删除topic后,数据文件也会一同删除),就需要设置此配置项为true。

    设置环境变量

    $ vim /etc/profile
    export KAFKA_HOME=/usr/local/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    #生效
    $ . /etc/profile

    启动脚本

    $ vim /usr/lib/systemd/system/kafka.service
    [Unit]
    Description=Apache Kafka server (broker)
    After=network.target  zookeeper.service
    [Service]
    Type=simple
    User=root
    Group=root
    ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
    ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
    Restart=on-failure
    [Install]
    WantedBy=multi-user.target
    $ systemctl daemon-reload

    启动kafka

    在启动kafka集群前,需要确保ZooKeeper集群已经正常启动。接着,依次在kafka各个节点上执行如下命令即可。这里将kafka放到后台运行,启动后,会在启动kafka的当前目录下生成一个nohup.out文件,可通过此文件查看kafka的启动和运行状态。通过jps指令,可以看到有个Kafka标识,这是kafka进程成功启动的标志。

    $ cd /usr/local/kafka
    $ nohup bin/kafka-server-start.sh config/server.properties &
    # 或者
    $ systemctl start kafka
    $ jps
    21840 Kafka
    15593 Jps
    15789 QuorumPeerMain

    Kafka相关知识

    下图展示了Kafka的相关术语以及之间的关系:

    Kafka调试,第1张

            上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。

            如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
    broker

            Kafka 集群包含一个或多个服务器,服务器节点称为broker。

            broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。

            如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。

            如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

    Topic

            每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

    类似于数据库的表名

    Partition

            topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

    Producer

            生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

    Consumer

            消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

    Consumer Group

            每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

    Leader

            每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

    Follower

            Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

    相关参考链接:Kafka详解(包括kafka集群搭建)-CSDN博客

    https://www.cnblogs.com/duanxz/p/4492870.html

    Java kakfa配置application.yml

    spring:
      kafka:
        bootstrap-servers: 10.0.0.6:9092
        producer:
          acks: all
          retries: 0
          batch-size: 16384
          buffer-memory: 33554432
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
          properties:
            linger.ms: 1

    pom.xml依赖

    
    	org.springframework.cloud
    	spring-cloud-starter-bus-kafka
    	
    	    
    	        org.springframework.integration
    	        spring-integration-kafka
    	    
    	
    
    
    	org.springframework.integration
    	spring-integration-kafka
    	${spring-integration.version}
    

    发送kafka消息工具类实现 

    package com.test.util;
    /*发送kafka消息工具类*/
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import com.test.common.util.JsonUtil;
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    @Component
    @EnableKafka
    public class KafkaSender {
    	@Autowired(required = false)
    	private KafkaTemplate template;
    	@Value("${kafka.debug:true}")
    	private boolean kafka_send_debug;
    	public void sendMsgAsyncAndLog(String topic, Object msg) {
    		if (kafka_send_debug) {
    			log.info("sendMsgAsyncAndLog to {} msg: {}", topic, JsonUtil.objToStr(msg));
    		}
    		sendMsgAsync(topic, msg, new ListenableFutureCallback>() {
    			@Override
    			public void onSuccess(SendResult result) {
    				log.info("sendMsgAysnc suc! msg:{}", msg);
    			}
    			@Override
    			public void onFailure(Throwable ex) {
    				log.error(ex.getMessage(), ex);
    				log.error("sendMsgAysnc error! msg:{}", msg);
    			}
    		});
    	}
    	public void sendMsgAsync(String topic, Object msg, ListenableFutureCallback> callback) {
    		if (this.template != null) {
    			ListenableFuture> future = this.template.send(topic, msg);
    			future.addCallback(callback);
    		}
    	}
    }
    

     监听topic并消费示例

    package com.receive.test.listener;
    /*监听kafka topic消息并消费*/
    import java.util.Optional;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    import com.test.route.RouteDeal;
    import lombok.extern.slf4j.Slf4j;
    @Slf4j
    @Component
    public class TestKafkaListener {
    	@Autowired
    	private RouteDeal routeDeal;
    	@KafkaListener(topics = { "topic.test_topic" })
    	public void kakfaListenerDeal(ConsumerRecord record) {
    		try {
    			Optional kafkaMessage = Optional.ofNullable(record.value());
    			if (kafkaMessage.isPresent()) {
    				String message = kafkaMessage.get();
    				if (Boolean.getBoolean("debug_log")) {
    					log.info("receive log: {}", message);
    				}
    				routeDeal.putEle(message);
    			}
    		} catch (Exception e) {
    			log.error(e.getMessage(), e);
    		}
    	}
    }

    Kafka调试

    kefka提供了多个命令用于查看、创建、修改、删除topic信息,也可以通过命令测试如何生产消息、消费消息等,这些命令位于kafka安装目录的bin目录下,这里是/usr/local/kafka/bin,包括createKafkaOnly.sh  createKafka.sh      deleteKafka.sh 创建与删除topic。

    登录任意一台kafka集群节点,切换到此目录下,即可进行命令操作。

    下面列举kafka的一些常用命令的使用方法。

    (1)显示topic列表

    #kafka-topics.sh  --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --list
    $ kafka-topics.sh  --zookeeper 10.0.0.6:2181 --list
    topic123
    

    (2)创建一个topic,并指定topic属性(副本数、分区数等)

    #kafka-topics.sh --create --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --replication-factor 1 --partitions 3 --topic topic123 
    $ kafka-topics.sh --create --zookeeper 10.0.0.6:2181 --replication-factor 1 --partitions 3 --topic topic123
    Created topic topic123.
    #--replication-factor表示指定副本的个数
    

    (3)查看某个topic的状态

    #kafka-topics.sh --describe --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
    $ kafka-topics.sh --describe --zookeeper 10.0.0.6:2181 --topic topic123
    Topic: topic123	PartitionCount: 3	ReplicationFactor: 1	Configs: 
    	Topic: topic123	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
    	Topic: topic123	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
    	Topic: topic123	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
    

    (4)生产消息 阻塞状态

    #kafka-console-producer.sh --broker-list 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
    $ kafka-console-producer.sh --broker-list 10.0.0.6:9092 --topic topic123
    

    (5)消费消息 阻塞状态

    #kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123
    $ kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092 --topic topic123
    #从头开始消费消息
    #kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092 --topic topic123 --from-beginning
    $ kafka-console-consumer.sh --bootstrap-server 10.0.0.6:9092,10.0.0.7:9092,10.0.0.8:9092 --topic topic123 --from-beginning
    

    (6)删除topic

    #kafka-topics.sh --delete --zookeeper 10.0.0.6:2181,10.0.0.7:2181,10.0.0.8:2181 --topic topic123
    $ kafka-topics.sh --delete --zookeeper 10.0.0.6:2181 --topic topic123

      (7)其他

    #kafka查看topic里n条消息
    sh kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic topic_name  --from-beginning --max-messages n
    #模拟发送数据
    /home/kafka/software/kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic_name
    #消费数据
    /home/kafka/software/kafka/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_name  --from-beginning 
    #查看topic列表 
    /home/kafka/software/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

        kafka启动常见问题

    Kafka调试,第2张

            1.端口被占用Socket server failed to bind to 0.0.0.0:9092: Address already in use. 

            解决办法:netstat -nltp | grep 9092 找到并kill掉对应的进程,重新启动kafka进程。

            2.配置文件错误,如示例将log.retention.hours的值配置成了字母,实际应该是数值类型,导致kafka无法启动

    Kafka调试,第3张

    解决办法:修改错误的配置,重新启动kafka进程

            3.节点已注册

            从kafka的server.log日志中发现如下信息,这种方式一般是因为磁盘问题,回想下是否进行过磁盘的重分配,将原来其它节点的磁盘,分配给了现在的节点,这种现象一般发生在单机版kafka多broker节点的场景

            此时如果将磁盘分配还原成最开始的分配方式并启动kafka进程后,会出现下面第二张图的问题,出现了同一个kafka主题分区有两个目录的情况

    Kafka调试,第4张

    解决办法:如果是初次部署,还没有流量进来,先卸载kafka节点,再重新部署;

                      其它情况,先停kafka服务,删除两个相同的目录中的一个,只保留其中一个目录即可,然后重启kafka进程

            4. 当日志出现类似于这样的ERROR信息时,可能是出现了脏副本

    Exiting because log truncation is not allowed for partition test01-1, current leader's  latest offset 28025 is less than replica's latest offset 28402

    出现这样的错误信息后,kafka启动会失败,这时,server.properties文件中添加参数unclean.leader.election.enable=true,启动kafka服务即可以启动

                5. 当出现下面的报错时,需要检查目录的权限

            FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.io.IOException: Permission denied

    Kafka调试,第5张

            6. 当出现下面的报错时,需要检查磁盘问题

            org.apache.kafka.common.KafkaException: java.io.IOException: Input/output error