pip install pykafka
from pykafka import KafkaClient def get_kafka_producer(hosts, topics): client = KafkaClient(hosts=hosts) print(client.topics) topic = client.topics[topics] producer = topic.get_producer() return producer
hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092' topics = "test_kafka_topic" producer = get_kafka_producer(hosts, topics) for i in range(10): msg = "test message " + str(i) # msg = bytes(msg, encoding='utf-8') # producer.produce(msg) producer.produce(msg.encode()) producer.stop()
def get_kafka_consumer(hosts, topics): client = KafkaClient(hosts=hosts) topic=client.topics[topics] consumer = topic.get_balanced_consumer(consumer_group='test_kafka_topic', auto_commit_enable=True, zookeeper_connect='192.168.20.201:2181,192.168.20.202:2181,192.168.20.203:2181', managed=True, consumer_timeout_ms=1000) # managed=True,即使用新式reblance分区方法,不需zk;managed=False则需通过zk来实现reblance return consumer
测试
hosts = '192.168.20.203:9092,192.168.20.204:9092,192.168.20.205:9092' topics = "test_kafka_topic" consumer = get_kafka_consumer(hosts, topics) for msg in consumer: print(msg) if msg is not None: print(msg.offset) print(msg.value)