Kafka Assistant 是一款 Kafka GUI 管理工具——管理Broker,Topic,Group、查看消费详情、监控服务器状态、支持多种消息格式。
你需要将 advertised.listeners(如果你使用Docker镜像,则为 KAFKA_ADVERTISED_LISTENERS)设置为外部地址(host/IP),以便客户端可以正确地连接到它。否则,他们会尝试连接到内部主机地址–如果无法到达,问题就会接踵而来。
换一种说法,由Spencer Ruport提供。
listeners 是Kafka所绑定的接口。advertised.listeners 是客户端的连接方式。
在这篇文章中,我将谈论为什么这是有必要的配置 listeners 和 advertised.listeners,然后展示如何基于几个场景–Docker和AWS来做。
Apache Kafka是一个分布式系统。数据是从一个给定的分区的领导者那里读取和写入的,这个领导者可以是集群中的任何一个Broker。当一个客户端(生产者/消费者)启动时,它将请求关于哪个broker是一个分区的领导者的元数据–它可以从任何broker那里做到这一点。返回的元数据将包括该分区的领导者broker的可用端点,然后客户端将使用这些端点连接到broker,根据需要读/写数据。
正是这些端点给人们带来了麻烦。在单机上,运行裸机(没有虚拟机,没有Docker),可能只需要使用主机名(或只是localhost),这很容易。但是,一旦你进入更复杂的网络设置和多节点,你就必须更加注意。
让我们假设你有一个以上的网络。比如:
你需要告诉Kafka Broker如何相互联系,但也要确保外部客户端(生产者/消费者)可以联系到他们需要联系的Broker。
最关键的是,当你运行一个客户端时,你传递给它的Broker只是它要去的地方,并从那里获得集群中Broker的元数据。它为读/写数据而连接的实际主机和IP是基于Broker在初始连接中传递回来的数据–即使它只是一个单一的节点,而且返回的Broker与它所连接的Broker相同。
为了正确配置,你需要了解Kafka Broker可以有多个监听器。一个监听器是一个组合:
让我们来看看一些配置。通常情况下,协议也被用于监听器的名称,但在这里,让我们通过使用监听器的抽象名称来使它变得漂亮和清晰。
KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB
我使用的是Docker配置名称–如果你直接配置 server.properties(例如,在AWS等),其对应名称在以下列表中缩进显示。
Kafka Broker之间的通信,通常是在内部网络(如Docker网络,AWS VPC等)。要定义使用哪个监听器,请指定 KAFKA_INTER_BROKER_LISTENER_NAME(inter.broker.listener.name)。使用的主机/IP必须是其他人可以从Broker机器上访问的。
Kafka客户端很可能不在Broker的网络中,这就是额外监听器的作用。
每个监听器在被连接到时,都会报告它可以被接连到的地址。你到达Broker的地址取决于所使用的网络。如果你从内部网络连接到经纪人,它将是一个不同于外部连接的主机/IP。
当连接到Broker时,返回给客户端的监听器将是你连接到的监听器(基于端口)。
kafkacat是一个探索这个问题的有用工具。使用 -L,你可以看到你所连接的监听器的元数据。基于上述相同的监听器配置(LISTENER_BOB/LISTENER_FRED),注意观察返回的结果:
$ kafkacat -b kafka0:9092 \ -L Metadata for all topics (from broker -1: kafka0:9092/bootstrap): 1 brokers: broker 0 at localhost:9092
$ kafkacat -b kafka0:29092 \ -L Metadata for all topics (from broker 0: kafka0:29092/0): 1 brokers: broker 0 at kafka0:29092
你也可以用tcpdump来检查连接到Broker服务器的客户端的流量,并发现从代理服务器返回的主机名。
即使你能与Broker建立初始连接,在元数据中返回的地址仍然可能是一个你的客户端无法访问的主机名。
让我们一步一步来分析。
$ nc -vz ec2-54-191-84-122.us-west-2.compute.amazonaws.com 9092 found 0 associations found 1 connections: 1: flags=82outif utun5 src 172.27.230.23 port 53352 dst 54.191.84.122 port 9092 rank info not available TCP aux info available
我们连上了9092端口,继续
echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test
这一步发生了什么:
$ echo "test"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >>[2022-07-30 15:08:41,932] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1547 ms has passed since batch creation plus linger time
$ echo "foo"|kafka-console-producer --broker-list ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test >> $ kafka-console-consumer --bootstrap-server ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 --topic test --from-beginning foo
它工作得很好! 这是因为我们正在连接到 9092 端口,它被配置为内部监听器,因此报告其主机名为 ip-172-31-18-160.us-west-2.compute.internal,这可以从Broker机器上解析(因为这是它自己的主机名!)。
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -L Metadata for all topics (from broker -1: ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092/bootstrap): 1 brokers: broker 0 at ip-172-31-18-160.us-west-2.compute.internal:9092
很明显,内部主机名被返回。这也使得这个看似混乱的错误变得更有意义–连接到一个主机名,在另一个主机上得到一个查询错误。
$ kafkacat -b ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 -C -t test % ERROR: Local: Host resolution failure: ip-172-31-18-160.us-west-2.compute.internal:9092/0: Failed to resolve 'ip-172-31-18-160.us-west-2.compute.internal:9092': nodename nor servname provided, or not known
这里,我们在本地机器上使用消费者模式(-C)的kafkacat来尝试从主题中读取。和以前一样,因为我们从元数据中的Broker那里得到了内部监听器的主机名,所以客户端无法解析这个主机名来进行读/写。
我看到一个Stack Overflow的答案,建议直接更新我的hosts文件…这不是更简单吗?
这可以解决问题,而不是真正修复它。
如果Broker服务器报告了一个客户端无法连接的主机名,那么在本地 /etc/hosts 中硬编码主机名/IP组合可能看起来是一个很好的修复。但这是一个非常脆弱的、手动的解决方案。当IP发生变化时,当你移动主机而忘记带配置hosts时,以及当其他人想做同样的事情时,会发生什么?
了解并实际修复你的网络的 advertised.listeners 设置要好得多。
为了在Docker中运行,你需要为Kafka配置两个监听器。
**Docker网络内的通信。**这可能是Broker之间的通信以及在Docker中运行的其他组件之间的通信,如Kafka Connect或第三方客户端或生产者。对于这些通信,我们需要使用Docker容器的主机名。同一Docker网络上的每个Docker容器将使用Kafka Broker容器的主机名来到达它。
**非Docker网络流量。**这可能是在Docker主机上本地运行的客户端,例如。假设他们将在localhost上连接到从Docker容器暴露出来的端口。这是Docker Compose的片段。
我命名AWS是因为它是大多数人使用的,但这适用于任何IaaS/云解决方案。
这里适用的概念与Docker完全相同。主要的区别是,在Docker中,外部连接很可能只是在localhost上(如上),而在云托管的Kafka中(如AWS上),外部连接将来自非本地的机器。
另一个复杂的问题是,虽然Docker网络与主机的网络严重隔离,但在IaaS上,外部主机名往往是可以在内部解析的,这使得你可能真正遇到这些问题时,一发不可收拾。
有两种方法,取决于你要连接到Broker服务器的外部地址是否也可以在网络(如VPC)上的所有Broker服务器上进行本地解析。
在这里,你可以用一个监听器搞定。现有的监听器叫 PLAINTEXT,只需要设置 advertised.listeners(即传递给入站客户的那个)。
advertised.listeners=PLAINTEXT://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092
现在内部和外部的连接都将使用 ec2-54-191-84-122.us-west-2.compute.amazonaws.com 进行连接。因为 ec2-54-191-84-122.us-west-2.com compute.amazonaws.com 既可以在本地也可以在外部解决,所以事情进展顺利。
你将需要为Kafka配置两个监听器。
下面是一个例子:
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092 listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT advertised.listeners=INTERNAL://ip-172-31-18-160.us-west-2.compute.internal:19092,EXTERNAL://ec2-54-191-84-122.us-west-2.compute.amazonaws.com:9092 inter.broker.listener.name=INTERNAL
原文地址: