pykafka/kafka 您所在的位置:网站首页 encode和decode可以一起用吗 pykafka/kafka

pykafka/kafka

2024-01-13 08:38| 来源: 网络整理| 查看: 265

一、简介 python连接kafka的标准库,kafka-python和pykafka。kafka-python使用的人多是比较成熟的库,kafka-python并没有zk的支持。pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。使用kafka Cluster。

二、pykafka (1) pykafka安装 根据机器环境从以下三种方式中选择进行一种安装pykafka,版本号是2.7.0。

PyPI安装

pip install pykafka

conda安装

conda install -c conda-forge pykafka

anaconda自带pip安装

/root/anaconda3/bin/pip install pykafka (2) pykafka的api

1、http://pykafka.readthedocs.io/en/latest/,https://github.com/Parsely/pykafka 2、在pykafka安装目录site-packages/pykafka/下,直接查看。

(3) pykafka生产者api

#coding=utf-8 import time from pykafka import KafkaClient class KafkaTest(object): """ 测试kafka常用api """ def __init__(self, host="192.168.237.129:9092"): self.host = host self.client = KafkaClient(hosts=self.host) def producer_partition(self, topic): """ 生产者分区查看,主要查看生产消息时offset的变化 :return: """ topic = self.client.topics[topic.encode()] partitions = topic.partitions print (u"查看所有分区 {}".format(partitions)) earliest_offset = topic.earliest_available_offsets() print(u"获取最早可用的offset {}".format(earliest_offset)) # 生产消息之前看看offset last_offset = topic.latest_available_offsets() print(u"最近可用offset {}".format(last_offset)) # 同步生产消息 p = topic.get_producer(sync=True) p.produce(str(time.time()).encode()) # 查看offset的变化 last_offset = topic.latest_available_offsets() print(u"最近可用offset {}".format(last_offset)) def producer_designated_partition(self, topic): """ 往指定分区写消息,如果要控制打印到某个分区, 需要在获取生产者的时候指定选区函数, 并且在生产消息的时候额外指定一个key :return: """ def assign_patition(pid, key): """ 指定特定分区, 这里测试写入第一个分区(id=0) :param pid: 为分区列表 :param key: :return: """ print("为消息分配partition {} {}".format(pid, key)) return pid[0] topic = self.client.topics[topic.encode()] p = topic.get_producer(sync=True, partitioner=assign_patition) p.produce(str(time.time()).encode(), partition_key=b"partition_key_0") def async_produce_message(self, topic): """ 异步生产消息,消息会被推到一个队列里面, 另外一个线程会在队列中消息大小满足一个阈值(min_queued_messages) 或到达一段时间(linger_ms)后统一发送,默认5s :return: """ topic = self.client.topics[topic.encode()] last_offset = topic.latest_available_offsets() print("最近的偏移量 offset {}".format(last_offset)) # 记录最初的偏移量 old_offset = last_offset[0].offset[0] p = topic.get_producer(sync=False, partitioner=lambda pid, key: pid[0]) p.produce(str(time.time()).encode()) s_time = time.time() while True: last_offset = topic.latest_available_offsets() print("最近可用offset {}".format(last_offset)) if last_offset[0].offset[0] != old_offset: e_time = time.time() print('cost time {}'.format(e_time-s_time)) break time.sleep(1) def get_produce_message_report(self, topic): """ 查看异步发送消报告,默认会等待5s后才能获得报告 """ topic = self.client.topics[topic.encode()] last_offset = topic.latest_available_offsets() print("最近的偏移量 offset {}".format(last_offset)) p = topic.get_producer(sync=False, delivery_reports=True, partitioner=lambda pid, key: pid[0]) p.produce(str(time.time()).encode()) s_time = time.time() delivery_report = p.get_delivery_report() e_time = time.time() print ('等待{}s, 递交报告{}'.format(e_time-s_time, delivery_report)) last_offset = topic.latest_available_offsets() print("最近的偏移量 offset {}".format(last_offset)) if __name__ == '__main__': host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092' kafka_ins = KafkaTest(host) topic = 'test' # kafka_ins.producer_partition(topic) # kafka_ins.producer_designated_partition(topic) # kafka_ins.async_produce_message(topic) kafka_ins.get_produce_message_report(topic)

注意要点: 多进程使用pykafka共享一个client,会造成只有进程能够正常的写入数据,如果使用了dliver_report(包括同步),会导致子进程彻底阻塞掉不可用 使用producer.produce发送数据出现故障,如下

#!/bin/env python from pykafka import KafkaClient host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092' client = KafkaClient(hosts = host) topic = client.topics["test"] with topic.get_sync_producer() as producer: for i in range(100): producer.produce('test message ' + str(i ** 2))

报错:

Traceback (most recent call last): File "TaxiKafkaProduce.py", line 15, in producer.produce(('test message ' + str(i ** 2))) File "/root/anaconda3/lib/python3.6/site-packages/pykafka/producer.py", line 325, in produce "got '%s'", type(message)) TypeError: ("Producer.produce accepts a bytes object as message, but it got '%s'", )

是因为kafka传递的字节,因此在传递字符串处encode()即可,分别是client.topics和producer.produce(),如下:

#!/bin/env python from pykafka import KafkaClient host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092' client = KafkaClient(hosts = host) topic = client.topics["test".encode()] # 将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后 with topic.get_sync_producer() as producer: for i in range(100): producer.produce(('test message ' + str(i ** 2)).encode()) 同步与异步 from pykafka import KafkaClient #可接受多个client client = KafkaClient(hosts ="192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092") #查看所有的topic client.topics print client.topics topic = client.topics['test_kafka_topic']#选择一个topic message = "test message test message" # 当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式, with topic.get_sync_producer() as producer: producer.produce(message) # 以上的例子将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后 #但生产环境,为了达到高吞吐量,要采用异步的方式,通过delivery_reports =True来启用队列接口; producer = topic.get_producer(sync=False, delivery_reports=True) producer.produce(message) try: msg, exc = producer.get_delivery_report(block=False) if exc is not None: print 'Failed to deliver msg {}: {}'.format(msg.partition_key, repr(exc)) else: print 'Successfully delivered msg {}'.format(msg.partition_key) except Queue.Empty: pass

(4) pykafka消费者api pykafka消费者分为simple和balanced两种

simple适用于需要消费指定分区且不需要自动的重分配(自定义) balanced自动分配则选择

#coding=utf-8 from pykafka import KafkaClient class KafkaTest(object): def __init__(self, host="192.168.237.129:9092"): self.host = host self.client = KafkaClient(hosts=self.host) def simple_consumer(self, topic, offset=0): """ 消费者指定消费 :param offset: :return: """ topic = self.client.topics[topic.encode()] partitions = topic.partitions last_offset = topic.latest_available_offsets() print("最近可用offset {}".format(last_offset)) # 查看所有分区 consumer = topic.get_simple_consumer(b"simple_consumer_group", partitions=[partitions[0]]) # 选择一个分区进行消费 offset_list = consumer.held_offsets print("当前消费者分区offset情况{}".format(offset_list)) # 消费者拥有的分区offset的情况 consumer.reset_offsets([(partitions[0], offset)]) # 设置offset msg = consumer.consume() print("消费 :{}".format(msg.value.decode())) msg = consumer.consume() print("消费 :{}".format(msg.value.decode())) msg = consumer.consume() print("消费 :{}".format(msg.value.decode())) offset = consumer.held_offsets print("当前消费者分区offset情况{}".format(offset)) # 3 def balance_consumer(self, topic, offset=0): """ 使用balance consumer去消费kafka :return: """ topic = self.client.topics["kafka_test".encode()] # managed=True 设置后,使用新式reblance分区方法,不需要使用zk,而False是通过zk来实现reblance的需要使用zk consumer = topic.get_balanced_consumer(b"consumer_group_balanced2", managed=True) partitions = topic.partitions print("分区 {}".format(partitions)) earliest_offsets = topic.earliest_available_offsets() print("最早可用offset {}".format(earliest_offsets)) last_offsets = topic.latest_available_offsets() print("最近可用offset {}".format(last_offsets)) offset = consumer.held_offsets print("当前消费者分区offset情况{}".format(offset)) while True: msg = consumer.consume() offset = consumer.held_offsets print("{}, 当前消费者分区offset情况{}".format(msg.value.decode(), offset)) if __name__ == '__main__': host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092' kafka_ins = KafkaTest(host) topic = 'test' # kafka_ins.simple_consumer(topic) kafka_ins.balance_consumer(topic)

使用consumber_group和consumer_id

# -* coding:utf8 *- from pykafka import KafkaClient host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092' client = KafkaClient(hosts = host) print(client.topics) # 消费者 topic = client.topics['test'.encode()] consumer = topic.get_simple_consumer(consumer_group='test_group', # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息 auto_commit_enable=True, auto_commit_interval_ms=1, #这里就是连接多个zk zookeeper_connect='192.168.17.64:2181,192.168.17.65:2181,192.168.17.68:2181' consumer_id='test_id') for message in consumer: if message is not None: #打印接收到的消息体的偏移个数和值 print(message.offset, message.value)

报错:AttributeError: 'SimpleConsumer' object has no attribute '_consumer_group' 是因为kafka在传输的时候需要bytes,而不是str,所以在str上加上b标识就可以,如下:

# -* coding:utf8 *- from pykafka import KafkaClient host = '192.168.17.64:9092,192.168.17.65:9092,192.168.17.68:9092' client = KafkaClient(hosts = host) print(client.topics) # 消费者 topic = client.topics['test'.encode()] consumer = topic.get_simple_consumer(consumer_group=b'test_group', auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id=b'test_id') for message in consumer: if message is not None: print(message.offset, message.value.decode('utf-8'))

不要重复消费,对已经消费过的信息进行舍弃

consumer = topic.get_simple_consumer(consumer_group=b'test_group', auto_commit_enable=True, auto_commit_interval_ms=1, consumer_id=b'test_id')

不希望得到历史数据的时候,需要使用auto_commit_enable这个参数。 当consumer_group=b'test_group',运行一次后,能够得到正常数据;再一次后,就数据读取不到了,如下:

{b'kafka_test': None, b'test': None}

如果要在读取一次数据,就需要修改consumber_group的id,例如修改成consumber_group=b'test_group_1'后,再运行一次,就可以正常读取数据了。 因为:是kafka的订阅原理,同一个group下,消费之后已经读取完,如果想得到数据必须修改consumber_group_id。 group是消费者中的概念,按照group(组)对消费者进行区分。对于每个group,需要先指定订阅哪个topic的消息,然后该topic下的partition会平均分配到group下面的consumer上。所以会出现以下这些情况: 1、一个topic被多个group订阅,那么一条消息就会被不同group中的多个consumer处理。 2、同一个group中,每个partition只会被一个consumer处理,这个consumer处理的消息不一定是同一个key的。所以需要在处理的地方判断。

三、kafka-python (1) kafka-python安装

PyPI安装

pip install kafka-python

conda安装

conda install -c conda-forge kafka-python

anaconda自带pip安装

/root/anaconda3/bin/pip install kafka-python (2) kafka-python的api https://kafka-python.readthedocs.io/en/master/apidoc/modules.html https://kafka-python.readthedocs.io/en/master/index.html https://pypi.org/project/kafka-python/

(3) kafka-python生产者

import time from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers = ['192.168.17.64:9092', '192.168.17.65:9092', '192.168.17.68:9092']) # Assign a topic topic = 'test' def test(): print('begin') n = 1 try: while (n


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有