kafak python函式使用詳解

2022-02-18 17:35:11 字數 2459 閱讀 1056

from

kafka.structs import topicpartition,offsetandmetadata

configs =

topics=('

test

', )

# 注意指定分割槽將會失去故障轉移/負載均衡的支援,當然也沒有了自動分配分割槽的功能(因為已經人為指定了嘛)

topic_partition = topicpartition(topic='

test

',partition=0

) #

consumer = kafkaconsumer(**configs)

# 引數必須是列表,表示訂閱的topic/partition列表

consumer.assign([topic_partition])

# 獲取分給當前使用者的topic/partition資訊

consumer.assignment()

# 提交偏移量:可以告知伺服器當前偏移量,也可以設定偏移量

consumer.commit()

# 非同步提交

consumer.commit_async()

# 獲取伺服器的最後確認的偏移量,即最新資料開始讀取的地方

consumer.committed(topicpartition(topic='

test

', partition=0

))# 獲取伺服器當前最新的偏移量,讀到這個偏移量後,所有資料都讀取完了

consumer.highwater(topicpartition(topic='

test

', partition=0

))# 獲取消費的效能

consumer.metrics()

# 獲取某個topic的partition資訊

consumer.partitions_for_topic(topic)

# 獲取下一條資料開始讀取的偏移量,即從這個便宜量開始繼續讀取資料

consumer.position(topicpartition(topic='

test

', partition=0

))# 從指定偏移量位置開始讀取資料

consumer.seek(topicpartition(topic='

test

', partition=0), 283

)# 從頭開始讀取資料

consumer.seek_to_beginning()

# 從最後開始讀取資料

consumer.seek_to_end()

# 訂閱topic,可以訂閱多個,可以使用正規表示式匹配多個

consumer.subscribe()

# 獲取訂閱的資訊,無法獲取使用assign分配的topic/partition資訊

consumer.subscription()

# 獲取當前使用者授權的topic資訊

consumer.topics()

# 取消訊息的訂閱

consumer.unsubscribe()

# 一起消費多條訊息,最多等待時間timeout_ms,最多消費max_records

consumer.poll(self, timeout_ms=0, max_records=none)

# 獲取指定分割槽第乙個偏移量

consumer.beginning_offsets([topic_partition])

# 獲取指定分割槽最後乙個偏移量,最新的偏移量

consumer.end_offsets([topic_partition])

# 關閉連線

consumer.close()

# #consumer.seek(topic_partition,

284)

for message in

consumer:

print(message)

消費者設定為自動提交偏移量時,需要同時設定自動提交偏移量的時間間隔。如果消費完若干訊息後,還沒有到自動提交偏移量的時間時,應用掛了,則系統記錄的偏移量還是之前的值,那麼剛才消費的若干訊息,會在應用重連之後重新消費

消費段記錄下發送給伺服器的偏移量,獲取最新資料時再判斷這個偏移量是否正確會會

kafka 目前支援ssl、sasl/kerberos、sasl/plain三種認證機制。目前支援以下安全措施:

true(預設):自動提交偏移量,可以通過配置 auto.commit.interval.ms屬性來控制提交偏移量的頻率。(基於時間間隔)

false:手動控制偏移量。可以在程式邏輯必要的時候提交偏移量,而不是基於時間隔。此時可以進行同步,非同步,同步非同步組合(參考相應api)。

無法讀取偏移量時候讀取訊息的設定

latest(預設):從最新記錄讀取資料。

earliest:從起始位置讀取資料

參考:1、

2、3、

4、5、

6、

函式 使用函式指標操作函式

設計乙個名為calculate 的函式,他接受兩個double 值和乙個指向函式的指標,而被指向的函式接受兩個double引數,並返回乙個double值 calculate 函式的型別也是double,並返回被指向的函式值用calculate 的兩個double引數計算得到的值。例如,假設add 函...

eval函式 php PHP eval函式使用介紹

eval echo hello world 上邊 等同於下邊的 echo hello world 在瀏覽器中都輸出 hello world 運用eval 要注意幾點 1.eval函式的引數的字串末尾一定要有分號,在最後還要另加乙個分號 這個分號是php限制 2.注意單引號,雙引號和反斜槓的運用。如果...

函式 函式使用效能

以便 defer 能在合適時機執行 錯誤的例子 func example 這地方有個問題,example 函式是乙個加鎖操作。m.unlock 只有在 example 函式結束的時候才執行,那麼每次迴圈實際上是執行m.lock 語句,這個時候解鎖操作會被延遲到函式結束。很顯然這個邏輯並不是我們想要的...