大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
08-Kafka-Python编程
Kafka
Python
2022-12-06 14:25:03
24
0
0
bigdata
Kafka
Python
kafka-python 文档:[KafkaConsumer - kafka-python 2.0.2-dev documentation](https://link.zhihu.com/?target=https%3A//kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html) # 基本概念 * Topic:一组消息数据的标记符; * Producer:生产者,用于生产数据,可将生产后的消息送入指定的 Topic; * Consumer:消费者,获取数据,可消费指定的 Topic; * Group:消费者组,同一个 group 可以有多个消费者,一条消息在一个 group 中,只会被一个消费者获取; * Partition:分区,为了保证 kafka 的吞吐量,一个 Topic 可以设置多个分区。同一分区只能被一个消费者订阅。 # 启动Zookeeper与Kafka 在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建`nbu`的主题。 ```bash #启动zookeeper nbu@ecs:~$/usr/local/zookeeper/bin/zkServer.sh start #启动kafka nbu@ecs:~$ nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #创建nbu主题 nbu@ecs:~$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic nbu #创建文件夹`~/bigdata/kafka/kafka-demo-python` nbu@ecs:~$ mkdir ~/bigdata/kafka/kafka-demo-python nbu@ecs:~$ cd ~/bigdata/kafka/kafka-demo-python #创建SimpleProducer.java文件 nbu@ecs:~/bigdata/kafka/kafka-demo-python$ vim Producer-python.py ``` # Producer生产者 输入如下代码: ```python from kafka import KafkaProducer from kafka.errors import kafka_errors import traceback import json def producer_demo(): # 假设生产的消息为键值对(不是一定要键值对),且序列化方式为json producer = KafkaProducer( bootstrap_servers=['localhost:9092'], key_serializer=lambda k: json.dumps(k).encode(), value_serializer=lambda v: json.dumps(v).encode()) # 发送三条消息 for i in range(0, 3): future = producer.send( 'nbu', key='count_num', # 同一个key值,会被送至同一个分区 value=str(i)) # 向分区1发送消息 print("send {}".format(str(i))) try: future.get(timeout=10) # 监控是否发送成功 except kafka_errors: # 发送失败抛出kafka_errors traceback.format_exc() if __name__ == '__main__': producer_demo() ``` ```bash # 运行程序 nbu@ecs:~/bigdata/kafka/kafka-demo-python$ python Producer-python.py ```  > 若出现 > ```bash Traceback (most recent call last): File "Producer-python.py", line 1, in <module> from kafka import KafkaProducer ModuleNotFoundError: No module named 'kafka' > ``` > 需要安装kafka > ```bash > nbu@ecs:~$ pip3 install kafka-python > ``` # Consumer消费者 在`~/bigdata/kafka/kafka-demo-python`文件夹下创建Consumer-python.py ```bash nbu@ecs:~/bigdata/kafka/kafka-demo-python$ vim Consumer-python.py ``` 输入如下代码: ```python from kafka import KafkaConsumer import json def consumer_demo(): consumer = KafkaConsumer( 'nbu', bootstrap_servers='localhost:9092', group_id='test_id' ) for message in consumer: print("receive, key: {}, value: {}".format( json.loads(message.key.decode()), json.loads(message.value.decode()) ) ) if __name__ == '__main__': consumer_demo() ``` ```bash # 运行程序 nbu@ecs:~/bigdata/kafka/kafka-demo-python$ python Consumer-python.py ``` 另启动一个终端,向`nbu`主题中发送数据 ```bash nbu@ecs:~/bigdata/kafka/kafka-demo-python$ python Producer-python.py ``` 消费者终端显示如下:  # 消费者进阶操作 ## 初始化参数 列举一些 KafkaConsumer 初始化时的重要参数: * group_id 高并发量,则需要有多个消费者协作,消费进度,则由 group_id 统一。例如消费者 A 与消费者 B,在初始化时使用同一个 group_id。在进行消费时,一条消息被消费者 A 消费后,在 kafka 中会被标记,这条消息不会再被 B 消费(前提是 A 消费后正确 commit)。 * key_deserializer, value_deserializer 与生产者中的参数一致,自动解析。 * auto_offset_reset 消费者启动的时刻,消息队列中或许已经有堆积的未消费消息,有时候需求是从上一次未消费的位置开始读(则该参数设置为 earliest),有时候的需求为从当前时刻开始读之后产生的,之前产生的数据不再消费(则该参数设置为 latest)。 * enable_auto_commit, auto_commit_interval_ms 是否自动 commit,当前消费者消费完该数据后,需要 commit,才可以将消费完的信息传回消息队列的控制中心。enable_auto_commit 设置为 True 后,消费者将自动 commit,并且两次 commit 的时间间隔为 auto_commit_interval_ms。 ## 手动 commit ``` def consumer_demo(): consumer = KafkaConsumer( 'kafka_demo', bootstrap_servers='localhost:9092', group_id='test_id', enable_auto_commit=False ) for message in consumer: print("receive, key: {}, value: {}".format( json.loads(message.key.decode()), json.loads(message.value.decode()) ) ) consumer.commit() ``` ## 查看 kafka 堆积剩余量 在线环境中,需要保证消费者的消费速度大于生产者的生产速度,所以需要检测 kafka 中的剩余堆积量是在增加还是减小。可以用如下代码,观测队列消息剩余量: ``` consumer = KafkaConsumer(topic, **kwargs) partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)] print("start to cal offset:") # total toff = consumer.end_offsets(partitions) toff = [(key.partition, toff[key]) for key in toff.keys()] toff.sort() print("total offset: {}".format(str(toff))) # current coff = [(x.partition, consumer.committed(x)) for x in partitions] coff.sort() print("current offset: {}".format(str(coff))) # cal sum and left toff_sum = sum([x[1] for x in toff]) cur_sum = sum([x[1] for x in coff if x[1] is not None]) left_sum = toff_sum - cur_sum print("kafka left: {}".format(left_sum)) ```
上一篇:
08-InfluxDB数据库管理
下一篇:
08-MongoDB 索引
文档导航