資料處理 流資料處理利器

2021-10-16 01:14:04 字數 4146 閱讀 2168

流處理 (stream processing) 是一種計算機程式設計正規化,其允許給定乙個資料序列 (流處理資料來源),一系列資料操作 (函式) 被應用到流中的每個元素。同時流處理工具可以顯著提高程式設計師的開發效率,允許他們編寫有效、乾淨和簡潔的**。

流資料處理在我們的日常工作中非常常見,舉個例子,我們在業務開發中往往會記錄許多業務日誌,這些日誌一般是先傳送到 kafka,然後再由 job 消費 kafaka 寫到 elasticsearch,在進行日誌流處理的過程中,往往還會對日誌做一些處理,比如過濾無效的日誌,做一些計算以及重新組合日誌等等,示意圖如下:

go-zero是乙個功能完備的微服務框架,框架中內建了很多非常實用的工具,其中就包含流資料處理工具fx,下面我們通過乙個簡單的例子來認識下該工具:

inputstream 函式模擬了流資料的產生,outputstream 函式模擬了流資料的處理過程,其中 from 函式為流的輸入,walk 函式併發的作用在每乙個 item 上,filter 函式對 item 進行過濾為 true 保留為 false 不保留,foreach 函式遍歷輸出每乙個 item 元素。

乙個流的資料處理可能存在許多的中間操作,每個中間操作都可以作用在流上。就像流水線上的工人一樣,每個工人操作完零件後都會返回處理完成的新零件,同理流處理中間操作完成後也會返回乙個新的流。

fx 的流處理中間操作:

操作函式

功能輸入

distinct

去除重複的 item

keyfunc,返回需要去重的 key

filter

過濾不滿足條件的 item

filterfunc,option 控制併發量

group

對 item 進行分組

keyfunc,以 key 進行分組

head

取出前 n 個 item,返回新 stream

int64 保留數量

map物件轉換

mapfunc,option 控制併發量

merge

合併 item 到 slice 並生成新 stream

reverse

反轉 item

sort

對 item 進行排序

lessfunc 實現排序演算法

tail

與 head 功能類似,取出後 n 個 item 組成新 stream

int64 保留數量

walk

作用在每個 item 上

walkfunc,option 控制併發量

下圖展示了每個步驟和每個步驟的結果:

通過 from 函式構建流並返回 stream,流資料通過 channel 進行儲存:

// 例子

s := int

fx.from(func(source chan inte***ce{}) })// 原始碼func from(generate generatefunc) stream )go func() ()return range(source)}

filter

filter 函式提供過濾 item 的功能,filterfunc 定義過濾邏輯 true 保留 item,false 則不保留:

// 例子 保留偶數

s := int

fx.from(func(source chan inte***ce{}) }).filter(func(item inte***ce{}) bool return false})// 原始碼func (p stream) filter(fn filterfunc, opts ...option) stream , pipe chan inte***ce{}) }, opts...)}

group

group 對流資料進行分組,需定義分組的 key,資料分組後以 slice 存入 channel:

reverse

reverse 可以對流中元素進行反轉處理:

distinct 對流中元素進行去重,去重在業務開發中比較常用,經常需要對使用者 id 等做去重操作:

// 例子

fx.just(1, 2, 2, 2, 3, 3, 4, 5, 6).distinct(func(item inte***ce{}) inte***ce{} ).foreach(func(item inte***ce{}) )

// 結果為 1,2,3,4,5,6

// 原始碼

func (p stream) distinct(fn keyfunc) stream )

threading.gosafe(func() ]lang.placeholdertype)

for item := range p.source }})return range(source)}

walk

walk 函式併發的作用在流中每乙個 item 上,可以通過 withworkers 設定併發數,預設併發數為 16,最小併發數為 1,如設定 unlimitedworkers 為 true 則併發數無限制,但併發寫入流中的資料由 defaultworkers 限制,walkfunc 中使用者可以自定義後續寫入流中的元素,可以不寫入也可以寫入多個元素:

// 例子

fx.just("aaa", "bbb", "ccc").walk(func(item inte***ce{}, pipe chan inte***ce{}) ).foreach(func(item inte***ce{}) )// 原始碼func (p stream) walklimited(fn walkfunc, option *rxoptions) stream , option.workers)go func() wg.add(1)go func() ()// 作用在每個元素上fn(item, pipe)}()}// 等待處理完成wg.wait()close(pipe)}()return range(pipe)}

fx 工具除了進行流資料處理以外還提供了函式併發功能,在微服務中實現某個功能往往需要依賴多個服務,併發的處理依賴可以有效的降低依賴耗時,提公升服務的效能。

注意 fx.parallel 進行依賴並行處理的時候不會有 error 返回,如需有 error 返回或者有乙個依賴報錯需要立馬結束依賴請求請使用mapreduce工具進行處理。

專案位址

元件位址

爬蟲 資料處理 pandas資料處理

使用duplicated 函式檢測重複的行,返回元素為布林型別的series物件,每個元素對應一行,如果該行不是第一次出現,則元素為true keep引數 指定保留哪一重複的行資料 dataframe替換操作 使用df.std 函式可以求得dataframe物件每一列的標準差 資料清洗清洗重複值 清...

海量資料處理利器greenplum 初識

如果想在資料倉儲中快速查詢結果,可以使用greenplum。greenplum資料庫也簡稱gpdb。它擁有豐富的特性 第一,完善的標準支援 gpdb完全支援ansi sql 2008標準和sql olap 2003 擴充套件 從應用程式設計介面上講,它支援odbc和jdbc。完善的標準支援使得系統開...

資料處理 pandas資料處理優化方法小結

資料處理時使用最多的就是pandas庫,pandas在資料處理方面很強大,整合了資料處理和資料視覺化。pandas的視覺化使用的是matplotlib。回到主題 計算資料的某個欄位的所有值,對其欄位所有值進行運算 處理的字段資料為時間戳,需要計算該時間戳距離現在的時間,單位為天。一般方法 使用現在的...