使用pykafka, kafka-python开发

一、简介
     python连接kafka的标准库,kafka-python和pykafka。kafka-python使用的人多是比较成熟的库,kafka-python并没有zk的支持。pykafka是Samsa的升级版本,使用samsa连接zookeeper,生产者直接连接kafka服务器列表,消费者才用zookeeper。

二、kafka-python使用

(1) kafka-python安装

        pip install kafka-python

(2) kafka-python的api

(3) kafka-python生产者  

(4) kafka-python消费者


输出结果:
ConsumerRecord(topic='test', partition=0, offset=246, timestamp=1531980887190, timestamp_type=0, key=None, value=b'1', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=247, timestamp=1531980887691, timestamp_type=0, key=None, value=b'2', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=248, timestamp=1531980888192, timestamp_type=0, key=None, value=b'3', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=249, timestamp=1531980888694, timestamp_type=0, key=None, value=b'4', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=250, timestamp=1531980889196, timestamp_type=0, key=None, value=b'5', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=251, timestamp=1531980889697, timestamp_type=0, key=None, value=b'6', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=252, timestamp=1531980890199, timestamp_type=0, key=None, value=b'7', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=253, timestamp=1531980890700, timestamp_type=0, key=None, value=b'8', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=254, timestamp=1531980891202, timestamp_type=0, key=None, value=b'9', checksum=None, serialized_key_size=-1, serialized_value_size=1)
ConsumerRecord(topic='test', partition=0, offset=255, timestamp=1531980891703, timestamp_type=0, key=None, value=b'10', checksum=None, serialized_key_size=-1, serialized_value_size=2)

1
consumer = kafka.KafkaConsumer(bootstrap_servers = ['192.168.17.64:9092','192.168.17.65:9092','192.168.17.68:9092'],
2
group_id ='test_group_id',
3
auto_offset_reset ='latest',
4
enable_auto_commit = False)
enable_auto_commit=False      
 自动提交位移设为flase, 默认为取最新的偏移量,重新建立一个group_id,这样就实现了不影响别的应用程序消费数据,又能消费到最新数据,实现预警(先于用户发现)的目的。

刘小恺(Kyle) wechat
如有疑问可联系博主