中文字幕第五页-中文字幕第页-中文字幕韩国-中文字幕最新-国产尤物二区三区在线观看-国产尤物福利视频一区二区

python如何實現對kafka的基本操作

這篇文章主要為大家展示了“python如何實現對kafka的基本操作”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“python如何實現對kafka的基本操作”這篇文章吧。

成都創新互聯是一家專業的成都網站建設公司,我們專注網站建設、網站制作、網絡營銷、企業網站建設,賣鏈接廣告投放為企業客戶提供一站式建站解決方案,能帶給客戶新的互聯網理念。從網站結構的規劃UI設計到用戶體驗提高,創新互聯力求做到盡善盡美。

-- coding:utf-8 --

from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic

"""生產者"""
def produce(self):
    producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
    for i in range(4):
        msg = "msg%d" %i
        producer.send(self.topic,key=str(i),value=msg)
    producer.close()

"""一個消費者消費一個topic"""
def consume(self):
    #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers)
    consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers)
    print consumer.partitions_for_topic(self.topic)  #獲取test主題的分區信息
print consumer.topics()  #獲取主題列表
print consumer.subscription()  #獲取當前消費者訂閱的主題
print consumer.assignment()  #獲取當前消費者topic、分區信息
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic=self.topic, partition=0), 1)  #重置偏移量,從第1個偏移量消費
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" 
        % (message.topic,message.partition,message.offset, message.key,message.value))

"""一個消費者訂閱多個topic """
def consume2(self):
    consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))  #訂閱要消費的主題
print consumer.topics()
print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                      message.offset, message.key,
                                      message.value))
"""消費者(手動拉取消息)"""
def consume3(self):
    consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])
consumer.subscribe(topics=('TEST','TEST2'))
while True:
        message = consumer.poll(timeout_ms=5)   #從kafka獲取消息
        if message:
        print message
        time.sleep(1)

def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()

以上是“python如何實現對kafka的基本操作”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注創新互聯行業資訊頻道!

文章題目:python如何實現對kafka的基本操作
標題路徑:http://m.2m8n56k.cn/article28/jdcgjp.html

成都網站建設公司_創新互聯,為您提供用戶體驗自適應網站網站設計電子商務建站公司品牌網站建設

廣告

聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯

小程序開發
主站蜘蛛池模板: 热99re久久国超精品首页 | 欧美日韩无| 久草视频在线首页 | 欧美成 人h版在线观看 | 久久久国产精品网站 | 欧美成人三级伦在线观看 | 成人网视频在线观看免费 | 亚洲成人免费观看 | 在线播放免费播放av片 | 天天综合天天看夜夜添狠狠玩 | 亚洲精品美女在线观看 | 亚洲qingse中文久久网 | 精品亚洲欧美高清不卡高清 | 国产aaa毛片| 日本一级全黄大片 | 亚洲涩涩精品专区 | 亚洲在线天堂 | 国产精品亚洲视频 | 久久亚洲国产高清 | 欧美成人亚洲高清在线观看 | 国产孕妇孕交一级毛片 | 一区二区三区网站在线免费线观看 | 成人性视频在线三级 | 老司机黄色影院 | 成人a在线| 欧美激情视频一区二区免费 | 免费的一级片网站 | 99久久免费观看 | 欧美视频一区二区三区在线观看 | 国产一区二区三区四区在线观看 | 九九精品99久久久香蕉 | 亚洲一区二区精品 | 日本一级高清片免费 | japonensis国产福利 | 日韩欧美高清在线观看 | 日本欧美中文 | 成人精品视频网站 | 欧美精品久久久久久久久大尺度 | 午夜爽爽| 国产日韩精品一区二区三区 | 日本国产欧美色综合 |