Kafka 快速起步 作者 杜亦舒

2021-09-07 08:32:53 字數 4196 閱讀 8405

主要內容:

1. kafka 安裝、啟動

2. 訊息的 生產、消費

3. 配置啟動集群

4. 集群下的容錯測試

5. 從檔案中匯入資料,並匯出到檔案

tar -xzf kafka_2.10-0.10.1.1.tgz

cd kafka_2.10-0.10.1.1

> bin/zookeeper-server-start.sh \

config/zookeeper.properties

> bin/kafka-server-start.sh \

config/server.properties

開啟乙個新的終端視窗

bin/kafka-topics.sh --create \

--zookeeper localhost:2181\

--replication-factor 1\

--partitions 1\

--topic test

開啟乙個新的終端視窗

bin/kafka-console-producer.sh \

--broker-list localhost:9092\

--topic test

進入輸入模式,隨意輸入資訊,例如:

hello world

hi

開啟乙個新的終端視窗

bin/kafka-console-consumer.sh \

--bootstrap-server localhost:9092\

--topic test \

--from-beginning

便會顯示出剛才傳送的兩條訊息:

hello world

hi

這時可以開啟傳送訊息的終端視窗,輸入新的資訊,再返回來就可以看到自動接收到了新訊息

> cp config/server.properties \

config/server-1.properties

> cp config/server.properties \

config/server-2.properties

修改 config/server-1.properties 的以下幾項配置:

broker.id=1listeners=plaintext:

修改 config/server-2.properties 的以下幾項配置: 

broker.id=2listeners=plaintext:

> bin/kafka-server-start.sh \

config/server-1.properties &

> bin/kafka-server-start.sh \

config/server-2.properties &

bin/kafka-topics.sh --create \

--zookeeper localhost:2181\

--replication-factor 3\

--partitions 1\

--topic my-replicated-topic

bin/kafka-console-producer.sh 

--broker-list localhost:9092\

--topic my-replicated-topic

輸入訊息:

my test message 1my test message 2

bin/kafka-console-consumer.sh \

--bootstrap-server localhost:9092\

--from-beginning \

--topic my-replicated-topic

可以正常取得訊息

# 取得server1的程序號
ps aux | grep server-1.properties

# 殺掉程序
kill -9 43116

讀取訊息

bin/kafka-console-consumer.sh \

--bootstrap-server localhost:9092\

--from-beginning \

--topic my-replicated-topic

返回資訊:

my test message 1my test message 2

仍然可以正常取得訊息

kafka 中的 connecter 可以與外部系統進行連線,例如檔案系統、資料庫

下面實驗乙個簡單檔案系統互動,從乙個檔案中匯入資料,然後匯出到另乙個檔案中

echo -e "

foo\nbar

" > test.txt

bin/connect-standalone.sh \

config/connect-standalone.properties \

config/connect-file-source.properties \

config/connect-file-sink.properties

命令執行後,會輸出一系列的日誌資訊,等待執行完畢

cat test.sink.txt

返回結果:

foo

bar

成功匯出了 test.txt 中的資料 

執行第2步的命令後,為什麼是去讀test.txt?為什麼寫入了test.sink.txt?中間的過程是什麼樣的?

原因是在於兩個配置檔案

config/connect-file-source.properties(匯入配置)

name=local-file-source

connector.class=filestreamsource

tasks.max=1file=test.txt

topic=connect-test

file指定了是從test.txt中匯入資料

topic指定了把資料傳送到connect-test這個topic

connect-file-sink.properties(匯出配置)

name=local-file-sink

connector.class=filestreamsink

tasks.max=1file=test.sink.txt

topics=connect-test

file指定了把資料匯出到test.txt中匯入資料

topic指定從connect-test這個topic中讀取資料

檢視一下connect-test這個topic

bin/kafka-console-consumer.sh \

--bootstrap-server localhost:9092\

--topic connect-test \

--from-beginning

結果為:

,"

payload

":"foo"}

,"payload

":"bar

"}

現在向test.txt中新增一條新資料:

echo "

another line

" >> test.txt

再次執行cat test.sink.txt就會看到剛剛新增的資料:

kafka搭建 快速搭建Kafka服務

搞流處理的話,無論如何是繞不過kafka的了,還好kafka是乙個概念比較好理解的架構模型。我覺得官方的這三張圖已經很好地把模型結構給闡述清楚了。發布 訂閱模型 實現訊息寫入與訊息讀取解耦。kafka相當於是乙個訊息緩衝池 2.日誌檔案順序結構 kafka的高吞吐量就是依賴順序寫入 當然還包括了一些...

自動擋車型,快速起步

以下是文章出處 關注他sport模式 s擋 sport模式 s擋其實許多車都有,原理十分簡單,只需要改變油門的靈敏度和變速箱的換擋積極性就能達到讓加速更加積極的目的。變速箱的降擋積極,你就會覺得加速比較爽快,而如果降擋慢,就會感覺動力憋著出不來,而真正需要動力的多數是在市區穿插或者山路行駛,這種情況...

kafka之快速啟動

topic 主題 是kafka集群中用於儲存某一類 或者某一種資料。主題的資料只能新增 broker 集群中的每乙個分機 都是乙個broker 相當於kafka集群的節點 consumer 消費者 用來從kafka 集群中讀取訊息 producer 生產者 用於從kafka 集群中傳送訊息 stre...