Fluem 資料整合

2021-10-10 17:36:15 字數 3882 閱讀 5127

1.監控 乙個目錄,將資料列印出來

建立 spoolingtest.conf

a: 代表agent的名稱

type=spooldir ; 監控乙個目錄,只要目錄中有新的檔案就會被取過來,不能出現檔名一樣的檔案

a.sources = r1

a.sinks = k1

a.channels = c1

#指定spooldir的屬性

a.sources.r1.type = spooldir

a.sources.r1.spooldir = /usr/local/soft/data

a.sources.r1.fileheader = true

a.sources.r1.interceptors = i1

a.sources.r1.interceptors.i1.type = timestamp

#指定sink的型別

a.sinks.k1.type = logger

#指定channel

a.channels.c1.type = memory

a.channels.c1.capacity = 1000

a.channels.c1.transactioncapacity = 100

#組裝a.sources.r1.channels = c1

a.sinks.k1.channel = c1

啟動agent

flume-ng agent -n a -f ./spoolingtest.conf -dflume.root.logger=debug,console

2、監控目錄將資料儲存到hdfs

a.sources = r1

a.sinks = k1

a.channels = c1

#指定spooldir的屬性

a.sources.r1.type = spooldir

a.sources.r1.spooldir = /usr/local/soft/data

a.sources.r1.fileheader = true

a.sources.r1.interceptors = i1

a.sources.r1.interceptors.i1.type = timestamp

#將資料儲存到hdfs

a.sinks.k1.type = hdfs

#如果不存在會自動建立

a.sinks.k1.hdfs.path =hdfs://master:9000/flume/data

a.sinks.k1.hdfs.fileprefix = pre-

a.sinks.k1.hdfs.minblockreplicas= 1

a.sinks.k1.hdfs.filetype = datastream

#指定channel

a.channels.c1.type = memory

a.channels.c1.capacity = 1000

a.channels.c1.transactioncapacity = 100

#組裝a.sources.r1.channels = c1

a.sinks.k1.channel = c1

flume-ng agent -n a -f ./filetohdfs.conf -dflume.root.logger=debug,console

3.監控乙個檔案將 資料儲存hdfs

hbaselogtohdfs.conf

a.sources = r1

a.sinks = k1

a.channels = c1

#指定exec的屬性

a.sources.r1.type = exec

#指定監控的命令

a.sources.r1.command = tail -f /usr/local/soft/hbase-1.4.6/logs/hbase-root-master-master.log

#指定sink的型別

a.sinks.k1.type = hdfs

#如果不存在會自動建立

a.sinks.k1.hdfs.path = hdfs://master:9000/flume/hbase/master

a.sinks.k1.hdfs.fileprefix = pre-

a.sinks.k1.hdfs.minblockreplicas= 1

a.sinks.k1.hdfs.filetype = datastream

#指定channel

a.channels.c1.type = memory

a.channels.c1.capacity = 1000

a.channels.c1.transactioncapacity = 100

#組裝a.sources.r1.channels = c1

a.sinks.k1.channel = c1

啟動flume-ng agent -n a -f ./hbaselogtohdfs.conf -dflume.root.logger=debug,console

4、繫結乙個埠,實時收集資料

netcat.conf

a.sources = r1

a.sinks = k1

a.channels = c1

#繫結乙個埠

a.sources.r1.type=netcat

a.sources.r1.bind=master

a.sources.r1.port=8888

#指定sink的型別

a.sinks.k1.type = logger

#指定channel

a.channels.c1.type = memory

a.channels.c1.capacity = 1000

a.channels.c1.transactioncapacity = 100

#組裝a.sources.r1.channels = c1

a.sinks.k1.channel = c1

啟動agent

flume-ng agent -n a -f ./netcat.conf -dflume.root.logger=debug,console

6、flume將資料寫kafka

flumetokafka.conf

agent.sources=s1

agent.channels=c1

agent.sinks=k1

agent.sources.s1.type=exec

#監聽檔案位址

agent.sources.s1.command=tail -f /usr/flume/log.log

agent.channels.c1.type=memory

agent.channels.c1.capacity=10000

agent.channels.c1.transactioncapacity=100

#設定kafka接收器

agent.sinks.k1.type=org.apache.flume.sink.kafka.kafkasink

#設定kafka的broker位址和埠號

agent.sinks.k1.brokerlist=master:9092,node1:9092,node2:9092

#設定kafka的topic 如果topic不存在會自動建立乙個topic,預設分割槽為1,副本為1

agent.sinks.k1.topic=flume

#設定序列化方式

agent.sinks.k1.serializer.class=kafka.serializer.stringencoder

#將三個主件串聯起來

agent.sources.s1.channels=c1

agent.sinks.k1.channel=c1

資料倉儲EDW層資料整合整合的思考

資料倉儲edw層資料整合整合的思考 比爾 門恩 bill inmon 給出了資料倉儲這樣乙個定義,資料倉儲是在企業管理和決策中面向主題的 整合的 與時間相關的 不可修改的資料集合。今天單就資料倉儲的整合整合特性進行思考,我想資料倉儲的整合性大致主要體現在如下幾個方面。1 將企業相關it系統經過面向主...

slam資料集整合

根據大家的要求,在此整合一下常用的幾個資料集。我平時話太囉嗦了,這裡就簡單一些。為啥編輯器的分隔線都這麼萌 1.tum資料集 這個大家用的人都知道,rgb d資料集,有很多個sequence,自帶ground truth軌跡與測量誤差的指令碼 python寫的,還有一些有用的函式 有一些很簡單 xy...

pandas 資料整合concat,merge

這個函式橫向合併時,將表的所有資料通過索引相同合併,而merge函式可以選擇鍵值合併 引數功能 aixs 設定合併方式,0為縱向 1為橫向 jion 設定是交集還是並集,inner 是交集 outer 是並集 import pandas as pd import numpy as np df1 pd...