Python脚本消费kafka数据
短信预约 -IT技能 免费直播动态提醒
kafka简介(摘自百度百科)
一、简介:
详见:https://blog.csdn.net/Beyond_F4/article/details/80310507
二、安装
详见博客:https://blog.csdn.net/beyond_f4/article/details/80095689
三、按照官网的样例,先跑一个应用
1、生产者:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['172.21.10.136:9092']) #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
msg = "msg%d" % i
producer.send('test', msg)
producer.close()
2、消费者(简单demo):
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
启动后生产者、消费者可以正常消费。
3、消费者(消费群组)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
启动多个消费者,只有其中可以可以消费到,满足要求,消费组可以横向扩展提高处理能力
4、消费者(读取目前最早可读的消息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用消息,latest最新的消息,默认为latest
源码定义:{'smallest': 'earliest', 'largest': 'latest'}
5、消费者(手动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
print consumer.partitions_for_topic("test") #获取test主题的分区信息
print consumer.topics() #获取主题列表
print consumer.subscription() #获取当前消费者订阅的主题
print consumer.assignment() #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic=u'test', partition=0), 5) #重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
6、消费者(订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0')) #订阅要消费的主题
print consumer.topics()
print consumer.position(TopicPartition(topic=u'test', partition=0)) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
7、消费者(手动拉取消息)
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #从kafka获取消息
print msg
time.sleep(1)
8、消费者(消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))
num = 0
while True:
print num
print consumer.paused() #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print "resume..."
consumer.resume(TopicPartition(topic=u'test', partition=0))
print "resume......"
pause执行后,consumer不能读取,直到调用resume后恢复。
如果对您有帮助,记得给我点赞诺
如果对您有帮助,记得给我点赞诺
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341