基於視窗的實時統計

2021-08-09 19:35:53 字數 2748 閱讀 8982

實時資料是無邊界的,即不斷地有資料輸入,但我們的統計一般是有時間範圍的,離線統計以年月日為統計週期,最小能到小時週期,如果是分鐘甚至秒級別計算,則可認為是實時計算,我們把實時資料流按時間段分割成乙個個視窗,則可基於視窗進行資料統計。

我司開源pike支援三種視窗,結合各種udaf,通過sql就能能實現各種聚合統計:

跳動視窗是最直觀,最簡單的介面,如下圖,t1->t2為乙個視窗,t2->t3為乙個視窗,各個視窗之間沒有重疊,實時統計結果也是基於各個視窗內的資料,結果輸出頻率等於統計週期。

跳動視窗比較常用,一般適用於統計無交叉分鐘級別實時流量。

pike example:

withperiod 5m

select top 100 output(dt(outputctx())) as dt,

output('pc客戶端') as plt,

mjoin(dim_channelid,'dim_sync_channel','series_titlechinese') as title,

count(distinct userid, ipvalue) as uv

from dol_client

group by title

order by uv desc

滑動視窗相對跳動視窗稍複雜,主要在於相鄰視窗間有重疊,如下圖,t1->t3為視窗w1,t2->t4為視窗w2,w2與w1重疊t2->t3,此時w2相對w1滑動視窗為t1->t2,w1統計視窗為t1->t3, 統計視窗必須為滑動視窗的整數倍,即(t3-t1)%(t2-t1)=0。若統計視窗等於滑動視窗,則滑動視窗轉化為跳動視窗,因此,可認為跳動視窗是滑動視窗的特例。

滑動視窗主要適用於統計最近m時間內資料,輸出結果間隔為n, n <= m 且 m % n = 0。

pike example:

//以5分鐘為輸出頻率,同時統計最近5分鐘,最近10分鐘,最近4小時網頁端點直播uv和vv

withperiod 5m

select output(dt(outputctx())) as dt,

'ikan' as plt,

case when dim_liveondemand_c=102 then '點播' else '直播' end as type,

count(distinct userid,ipvalue) as uv_5m,

count(distinct userid,ipvalue,channelid) as vv_5m,

move('10m',count(distinct userid,ipvalue)) as uv_10m,

move('10m',count(distinct userid,ipvalue,channelid)) as vv_10m,

move('4h',linearcount(10000000,userid,ipvalue)) as uv_4h,

move('4h',linearcountex(100,userid,ipvalue,channelid)) as vv_4h,

from dol_ikan

group by plt,type

累計視窗則是累計乙個時間段內資料不斷輸出,例如w1為從t1開始累計到t2的資料,w2為從t1開始累計到t3的資料,w3為從t1開始累計到t4的資料,w1、w2、w3共享初始狀態;在乙個完整的累計週期內,完整累計週期必須為輸出頻率的整數倍,t1為初始狀態,t2輸出w1統計結果,t3輸出w2統計結果,t4輸出w3統計結果,t2-t1=t3-t2=t4-t3,(t3-t1)%(t2-t1)=0,(t4-t1)%(t2-t1)=0;下乙個完整累計週期則清零為初始化狀態重新開始統計,例如w6,w3都是乙個完整累計視窗,且w6,w3無交集,w6,w3之間如同跳動視窗。

累計視窗主要適用於獲取從整點或整天開始,累計到當前時間的統計資料,一般完整累計視窗與離線週期對應,但卻需要獲取當前時刻的實時統計資料,例如實時獲取當天累計vv、當前小時累計uv。

pike examlpe:

withperiod 5m

select output(dt(outputctx())) as dt,

channelid,

count(1) as logcount,

count(distinct userid, ipvalue) as uv,

count(distinct if(strisnullorempty(vvid), userid + channelid, vvid)) as vv,

accumulate('1h', count(1)) as logcount_thishour,

accumulate('1h', linearcount(10000000, userid, ipvalue)) as uv_thishour

accumulate('1h', linearcountex(100, if(strisnullorempty(vvid), userid + channelid, vvid))) as vv_thishour,

accumulate('1d', count(1)) as logcount_thisday,

accumulate('1d', hyperloglogcount(5, userid, ipvalue)) as uv_thisday,

accumulate('1d', loglogadaptivecount(5, if(strisnullorempty(vvid), userid + channelid, vvid))) as vv_thisday

from dol_smart

group by channelid

基於gst launch的實時轉碼

目標是實現乙個實時轉碼,可用於iptv提供節目源。相關工作在ubuntu作業系統下進行。需要對源 進行修改的時候,直接採用apt get source命令獲取源 根據需要進行修改,然後安裝,這樣能最大限度的保證相容性和穩定性。命令列示例 gstreamer是通過不同功能的element構成pipel...

spark之實時統計

這篇部落格其實和spark之spark streaming處理檔案流資料區別不是特別的大,權可以看作為畢業設計作準備的,使用了執行緒和通訊的模式處理檔案流,最後對5秒內的輸入資料進行統計,如下 package openclass import org.apache.spark.streaming.d...

hbase基於solr的實時索引

實時查詢方案 hbase key value store solr web前端實時查詢展示 1.hbase 提供海量資料儲存 2.solr提供索引構建與查詢 3.key value store 提供自動化索引構建 從hbase到solr 使用流程 前提 cdh5.3.2solr集群搭建好,cdh5....