Python操作Kafka爬坑

2022-09-23 18:21:13 字數 2718 閱讀 9229

組內做大資料,需要kafka寫入資料,最近在看python正好,練練手,網上找了一圈,都是用的pykafka,經過一整圈的安裝,最終搞定,**如下

#coding:u8

import sys

import time

import random

import datetime

import mysqldb

import codecs

from pykafka import kafkaclient

import logging

import json

import threading

ad=try:

ini=file("set.txt")

ad=ini.readline().splitlines()

ini.close

except exception as e:

print "open settings file error:",type(e)

ad=["192.168.1.121:9092"]

print "open ini file"

try:

client = kafkaclient(hosts = ad[0])

print "topics:",client.topics

topic = client.topics["mytopic"]

except exception as e:

print "opening kafka error:%s" %(type(e))

sys.exit(1)

print "before threading"

try:

with tp.get_sync_producer() as producer:

producer.produce(str(dct2))

except exception as e:

print "error:" ,type(e)

print "ini consumer"

while 1==1:

print "nn",type(consumer)

for message in consumer:

print "mm"

if message is not none:

print message.offset, message.value

except exception as e:

print e,type(e)

執行結果,可以列出topic,寫入的資料也沒有報錯資訊。但是,消費者取不到資料,無論是kafka直接取,還是python寫消費者**。

後來採用了 kafkapython 正常,**如下:

#coding:utf-8

import sys

import time

import random

import datetime

import codecs

import kafka.kafkaproducer

import logging

import json

import threading

ad=try:

ini=file("set.txt")

ad=ini.readline().splitlines()

ini.close

except exception as e:

ad=["192.168.1.121:9092,192.168.1.122.9092"]

#print "open settings file error:%d,%s" %(e.args[0],e.args[1])

print "opening settings file error:",e,type(e)

print "opened ini file"

try:

client = kafkaclient(hosts = ad[0])

print "topics:",client.topics

topic = client.topics["mytopic"]

except exception as e:

print "opening kafka error:%s" %(e.args[0])

sys.exit(1)

print "before threading"

try:

producer = kafkaproducer(bootstrap_servers=ad[0], value_serializer=lambda m: json.dumps(m).encode('utf-8'))

except exception as e:

print "opening kafka error:",e,type(e)

sys.exit(1)

print "before threading"

threads=

for i in range(0,12):

try:

threads.append(threading.thread(target=tf,args=(producer,i)))

threads[i].start()

except exception as e:

print "treand error at thread:%d:%s,%s" %(i,e,type(e))

print "main thread is ended"

**均有所節略。

使用python操作kafka

使用python操作kafka目前比較常用的庫是kafka python庫 pip3 install kafka pythonproducer test.py from kafka import kafkaproducer producer kafkaproducer bootstrap serve...

vue爬坑 入門坑

問題1 父子頁面路由模組之間的呼叫 非資料互動 問題2 資料請求時跨域解決 用vue cli建立的專案,開發位址是localhost 3010,資料介面是localhost 8080。此時資料和開發環境不在相同網域名稱下,需要跨域才能請求到資料。解2 解決辦法有多種 服務端修改請求頭資訊允許跨域請求...

Kafka入坑指南

kafka是為大資料而生的訊息中介軟體,以其百萬級tps的吞吐量名聲大噪,迅速成為大資料領域的寵兒,在資料採集 傳輸 儲存的過程中發揮著舉足輕重的作用,而storm,spark,flink等大資料流處理或批處理平台都有kafka的相關外掛程式支援。本著為開源做貢獻的原則,在學習kafka的同時也參與...