我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Python 使用python-kafk

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Python 使用python-kafk

使用python-kafka类库开发kafka生产者&消费者&客户端

  By: 授客 QQ:1033553122

 

 

 

1.测试环境

python 3.4

 

zookeeper-3.4.13.tar.gz

下载地址1:

http://zookeeper.apache.org/releases.html#download

https://www.apache.org/dyn/closer.cgi/zookeeper/

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

 

下载地址2:

https://pan.baidu.com/s/1dnBgHvySE9pVRZXJVmezyQ

 

kafka_2.12-2.1.0.tgz

下载地址1:

http://kafka.apache.org/downloads.html

下载地址2:

https://pan.baidu.com/s/1VnHkJgy4iQ73j5rLbEL0jw

 

pip-18.1.tar.gz

下载地址:https://pan.baidu.com/s/1VpYk8JvMuztzbvEF8mQoRw

说明:实践中发现,pip版本比较旧的话,没法安装whl文件

 

kafka_python-1.4.4-py2.py3-none-any.whl

下载地址1:

https://pypi.org/project/kafka-python/#files

https://files.pythonhosted.org/packages/5f/89/f13d9b1f32cc37168788215a7ad1e4c133915f6853660a447660393b577d/kafka_python-1.4.4-py2.py3-none-any.whl

 

下载地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

python_snappy-0.5.3-cp34-cp34m-win_amd64.whl

下载地址1:

https://www.lfd.uci.edu/~gohlke/pythonlibs/

 

下载地址2:

https://pan.baidu.com/s/10XtLXESp64NtwA73RbryVg

 

 

说明:

kafka-python支持gzip压缩/解压缩。如果要消费lz4方式压缩的消息,则需要安装python-lz4,如果要支持snappy方式压缩/解压缩则需要安装,否则可能会报错:kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for snappy compression codec not found.

构建生产者对象时,可通过compression_type 参数指定由对应生产者生产的消息数据的压缩方式,或者在producer.properties配置中配置compression.type参数。

 

参考链接:

https://pypi.org/project/kafka-python/#description

https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install

 

2.代码实践

生产者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

 

from kafka import KafkaProducer

import json

 

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

 

 

for i in range(0, 100):

    producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)

 

# Block直到单条消息发送完或者超时

future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg')

result = future.get(timeout=60)

print(result)

 

# Block直到所有阻塞的消息发送到网络

# 注意: 该操作不保证传输或者消息发送成功,仅在配置了linger_ms的情况下有用。(It is really only useful if you configure internal batching using linger_ms

 

 

# 序列化json数据

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer.send('MY_TOPIC1', {'shouke':'kafka'})

 

# 序列化字符串key

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode)

producer.send('MY_TOPIC1', b'shouke', key='strKey')

 

producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip')

for i in range(2):

    producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))

 

# 消息记录携带header

producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])

 

# 获取性能数据(注意,实践发现分区较多的情况下,该操作比较耗时

metrics = producer.metrics()

print(metrics)

 

producer.flush()

 

实践中遇到错误: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解决方案如下:

进入到配置目录(config),编辑server.properties文件,

查找并设置listener,配置监听端口,格式:listeners = listener_name://host_name:port,供kafka客户端连接用的ip和端口,例中配置如下:

listeners=PLAINTEXT://127.0.0.1:9092

 

 

API及常用参数说明:

class kafka.KafkaProducer(**configs)

bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)

 

key_serializer (可调用对象) –用于转换用户提供的key值为字节,必须返回字节数据。 如果为None,则等同调用f(key)。 默认值: None.

 

value_serializer(可调用对象) – 用于转换用户提供的value消息值为字节,必须返回字节数据。 如果为None,则等同调用f(value)。 默认值: None.

 

send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

topic(str) – 设置消息将要发布到的主题,即消息所属主题

 

value(可选) – 消息内容,必须为字节数据,或者通过value_serializer序列化后的字节数据。如果为None,则key必填,消息等同于“删除”。( If value is None, key is required and message acts as a ‘delete’)

 

partition (int, 可选) – 指定分区。如果未设置,则使用配置的partitioner

 

key (可选) – 和消息对应的key,可用于决定消息发送到哪个分区。如果平partition为None,则相同key的消息会被发布到相同分区(但是如果key为None,则随机选取分区)(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必须为字节数据或者通过配置的key_serializer序列化后的字节数据.

 

headers (可选) – 设置消息header,header-value键值对表示的list。list项为元组:格式 (str_header,bytes_value)

 

timestamp_ms (int, 可选) –毫秒数 (从1970 1月1日 UTC算起) ,作为消息时间戳。默认为当前时间

 

函数返回FutureRecordMetadata类型的RecordMetadata数据

 

flush(timeout=None)

发送所有可以立即获取的缓冲消息(即时linger_ms大于0),线程block直到这些记录发送完成。当一个线程等待flush调用完成而block时,其它线程可以继续发送消息。

 

注意:flush调用不保证记录发送成功

 

metrics(raw=False)

获取生产者性能指标。

 

参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

 

注:生产者代码是线程安全的,支持多线程,而消费者则不然

 

消费者

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka import KafkaConsumer

from kafka import TopicPartition

import json

 


consumer = KafkaConsumer('MY_TOPIC1',
                         bootstrap_servers=['127.0.0.1:9092'],
                         #auto_offset_reset='',
                         auto_offset_reset='latest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费
                         enable_auto_commit=True, # 自动提交消费者的offset
                         auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔
                         group_id='MY_GROUP1',
                         consumer_timeout_ms= 10000, # 如果10秒内kafka中没有可供消费的数据,自动退出
                         client_id='consumer-python3'
                         )

 

for msg in consumer:

    print (msg)

    print('topic: ', msg.topic)

    print('partition: ', msg.partition)

    print('key: ', msg.key, 'value: ', msg.value)

    print('offset:', msg.offset)

    print('headers:', msg.headers)

 

# Get consumer metrics

metrics = consumer.metrics()

print(metrics)

 

运行效果

 

 

 

通过assign、subscribe两者之一为消费者设置消费的主题

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],

                         auto_offset_reset='latest',

                         enable_auto_commit=True, # 自动提交消费数据的offset

                         consumer_timeout_ms= 10000, # 如果1秒内kafka中没有可供消费的数据,自动退出

                         value_deserializer=lambda m: json.loads(m.decode('ascii')), #消费json 格式的消息

                         client_id='consumer-python3'

                         )

 

 

# consumer.assign([TopicPartition('MY_TOPIC1', 0)])

# msg = next(consumer)

# print(msg)

 

consumer.subscribe('MY_TOPIC1')

for msg in consumer:

    print (msg)

 

 

API及常用参数说明:

class kafka.KafkaConsumer(*topics, **configs)

*topics (str) – 可选,设置需要订阅的topic,如果未设置,需要在消费记录前调用subscribe或者assign。

 

client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’

 

group_id (str or None) – 消费组名称。如果为None,则通过group coordinator auto-partition分区分配,offset提交被禁用。默认为None

 

auto_offset_reset (str) – 重置offset策略: 'earliest'将移动到最老的可用消息, 'latest'将移动到最近消息。 设置为其它任何值将抛出异常。默认值:'latest'。

 

enable_auto_commit (bool) –  如果为True,将自动定时提交消费者offset。默认为True。

 

auto_commit_interval_ms (int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true,默认值为: 5000。

 

value_deserializer(可调用对象) - 携带原始消息value并返回反序列化后的value

 

subscribe(topics=(), pattern=None, listener=None)

订阅需要的主题

topics (list) – 需要订阅的主题列表

pattern (str) – 用于匹配可用主题的模式,即正则表达式。注意:必须提供topics、pattern两者参数之一,但不能同时提供两者。

 

metrics(raw=False)

获取消费者性能指标。

 

参考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

 

客户端

#-*- encoding:utf-8 -*-

 

__author__ = 'shouke'

 

from kafka.client import KafkaClient

 

client = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)

 

# 获取所有broker

brokers = client.cluster.brokers()

for broker in brokers:

    print('broker: ', broker)

    print('broker nodeId: ', broker.nodeId)

 

# 获取主题的所有分区

topic = 'MY_TOPIC1'

partitions = client.cluster.available_partitions_for_topic(topic)

print(partitions)

 

partition_dict = {}

partition_dict[topic] = [partition for partition in partitions]

print(partition_dict)

 

 

运行结果:

broker:  BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)

broker nodeId:  0

{0}

{'MY_TOPIC1': [0]}

 

API及常用参数说明:

class kafka.client.KafkaClient(**configs)

bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'组成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host为broker(Broker:缓存代理,Kafka集群中的单台服务器)地址,默认值为 localhost, port默认值为9092,这里可以不用填写所有broker的host和port,但必须保证至少有一个broker)

 

client_id (str) – 客户端名称,默认值: ‘kafka-python-{version}’

 

request_timeout_ms (int) – 客户端请求超时时间,单位毫秒。默认值: 30000.

 

参考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

 

brokers()

获取所有broker元数据

 

available_partitions_for_topic(topic)

返回主题的所有分区

 

 

参考API: https://kafka-python.readthedocs.io/en/master/apidoc/ClusterMetadata.html

 

 

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Python 使用python-kafk

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Python 使用python-kafk

使用python-kafka类库开发kafka生产者&消费者&客户端  By: 授客 QQ:1033553122   1.测试环境python 3.4 zookeeper-3.4.13.tar.gz下载地址1:http://zookeepe
2023-01-30

为什么使用Python, Python应

-------------------------------------python 因为通用(General-purpose) 所以什么能做。。。问在领域优势?就是想知道py在已经成熟的领域的优势?几乎没有。已知的成熟领域有许多专门
2023-01-31

Python基础-Python基础使用

上篇文章 Python基础-初识Python 我们已经知道了什么是Python,Python的用处、和Python的解释器、Python的安装,这篇文章,我们主要讲Python的使用入门本文防盗链:http://python789.blog
2023-01-31

python使用pymsql

pymsql是Python中操作MySQL的模块,其使用方法和MySQLdb几乎相同。1、执行SQL#!/usr/bin/env python# -*- coding:utf-8 -*-import pymysql# 创建连接conn =
2023-01-31

python pysnmp使用

SNMP标准引入一组ASN.1语言元素,称之为SMI(Structure of ManagementInformation)。由SMI描述的相互关联的被管对象(Managed Objects)组成MIB(ManagementInformat
2023-01-31

python 使用sqlite3

Sqlite是一个轻量级的数据库,类似于Access.一、安装Python 2.5开始提供了对sqlite的支持,带有sqlite3库.没有sqlite的版本需要去PySqlite主页上下载安装包.PySqlite下载地址http://co
2023-01-31

python paramiko 使用

python 下的paramiko功能很之强大。我们所熟悉的Ansible就是用paramiko编写。paramiko主要是通过ssh协议对远程主机的管理。比如:执行远程主机的CLI、上传和下载文件等。1.通过使用paramiko 远程管理
2023-01-31

python PIL.Image使用

一、 基本概念通道每张图片由一个或多个通道构成RGB图像为例,每张图片由3个通道构成,即R通道,G通道,B通道。对于灰度图像,则只有一个通道。获取图像的通道数量和名称,可以由方法PIL.Image.getbands()获取,此方法返回一个
2023-01-31

Python使用cookie

为什么要使用Cookie呢?Cookie,指某些网站为了辨别用户身份、进行session跟踪而储存在用户本地终端上的数据(通常经过加密)比如说有些网站需要登录后才能访问某个页面,在登录之前,你想抓取某个页面内容是不允许的。那么我们可以利用U
2023-01-31

python使用setsockopt

默认的socket选项不够用的时候,就必须要使用setsockopt来调整。就是使用setsockopt。首先看一下socket的setsockopt 函数的定义:socket.setsockopt(level,optname,value)
2023-01-31

Python 使用 matplotlib

3D图形在数据分析、数据建模、图形和图像处理等领域中都有着广泛的应用,下面将给大家介绍一下如何在Python中使用 matplotlib进行3D图形的绘制,包括3D散点、3D表面、3D轮廓、3D直线(曲线)以及3D文字等的绘制。准备工作:p
2023-01-31

Python Urllib2使用

Python Urllib2使用我们先看下Python对于Urllib2的解释:Intro代码  urllib2:An extensible library for opening URLs using a variety of proto
2023-01-31

python gettext使用

python中使用gettext进行语言国际化的方法1.编辑源代码, 保存为gettextdemo.pyimport gettextcatalogs = gettext.find("example", localedir="locale",
2023-01-31

python flush使用

在python中 要达到每次输出刷新上次的效果,使用sys.stdout.flush()可实现。如下简单事例import sysfrom time import sleepfor i in range(13,1,-1):    print
2023-01-31

python getopt使用

“hp:i:”短格式 --- ,["help","ip=","port="]长格式 --- ,,,比如 [('-i','127.0.0.1'),('-p','80')] ; ,包含那些‘-’或‘--’的参数,比如:['55','66']加号
2023-01-31

python paramiko使用

http://www.lag.net/paramiko/Working with paramiko SSHClient is the main class provided by the paramkio module. It provid
2023-01-31

Python——使用ElementTre

XML内容如下:
2023-01-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录