這篇文章主要為大家展示了“python如何實現對kafka的基本操作”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“python如何實現對kafka的基本操作”這篇文章吧。
成都創新互聯是一家專業的成都網站建設公司,我們專注網站建設、網站制作、網絡營銷、企業網站建設,賣鏈接,廣告投放為企業客戶提供一站式建站解決方案,能帶給客戶新的互聯網理念。從網站結構的規劃UI設計到用戶體驗提高,創新互聯力求做到盡善盡美。
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
"""生產者""" def produce(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) for i in range(4): msg = "msg%d" %i producer.send(self.topic,key=str(i),value=msg) producer.close() """一個消費者消費一個topic""" def consume(self): #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers) print consumer.partitions_for_topic(self.topic) #獲取test主題的分區信息 print consumer.topics() #獲取主題列表 print consumer.subscription() #獲取當前消費者訂閱的主題 print consumer.assignment() #獲取當前消費者topic、分區信息 print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量 consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) #重置偏移量,從第1個偏移量消費 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value)) """一個消費者訂閱多個topic """ def consume2(self): consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) #訂閱要消費的主題 print consumer.topics() print consumer.position(TopicPartition(topic='TEST', partition=0)) #獲取當前主題的最新偏移量 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) """消費者(手動拉取消息)""" def consume3(self): consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092']) consumer.subscribe(topics=('TEST','TEST2')) while True: message = consumer.poll(timeout_ms=5) #從kafka獲取消息 if message: print message time.sleep(1)
def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()
以上是“python如何實現對kafka的基本操作”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注創新互聯行業資訊頻道!
文章題目:python如何實現對kafka的基本操作
標題路徑:http://m.2m8n56k.cn/article28/jdcgjp.html
成都網站建設公司_創新互聯,為您提供用戶體驗、自適應網站、網站設計、電子商務、建站公司、品牌網站建設
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯