大資料上的流式SQL引擎 StreamCQL

2021-07-09 07:52:07 字數 3851 閱讀 8095

華為在近期的華為開發者大會上宣布開源其流處理平台sql引擎streamcql(stream continuous query language),表示歡迎更多的開發者加入社群,並將持續提供新的特性,目前在github**上已能看到其專案**和文件。

在最近的一次meetup上,華為實時分析團隊高階技術專家、streamcql核心作者何志強對該專案進行了介紹。streamcql作為華為fusioninsight大資料平台的重要元件,提供了在分布式流處理平台上的類sql查詢能力,包括視窗計算等高階特性,在開源之前已經在電信、金融等多個行業成功應用,此次streamcql開源目的是讓更多的社群開發愛好者、包括使用者、認證夥伴加入到專案的開源貢獻和應用中,以更好的開源專案來服務使用者,發展社群。華為作為大資料平台的重要提供商,最近加快了在開源上的步伐,在之前幾個月,其開源了另乙個大資料專案astro(spark sql on hbase package)。

streamcql是乙個類sql的宣告式語言,它用於在流(streams)和可更新關係(updatable relation)上的可持續查詢,目的是在流處理平台分布式計算能力之上,通過使用簡易通用的類sql語言,使得業務邏輯的開發變得統一和簡易。在功能上,streamcql彌補了傳統流處理平台上一些基本業務功能的缺失,除了過濾、轉換等基本sql能力之外, 還引入基於記憶體視窗的計算、統計、關聯等能力,以及流資料的拆分、合併等功能。下圖是streamcql的功能架構圖。

圖1:streamcql的功能架構圖

streamcql在邏輯架構上和通用的sql引擎比較相似,包含了語法分析、語義分析、邏輯優化、物理執行計畫和引擎適配這些部分。不同的是,由於處理的是流資料,所以streamcql必須定義流和視窗這些概念,並基於這些概念,定義相關的語法、運算元並進行邏輯優化和物理規劃。另外,通過引擎適配,streamcql除了可以支援目前主流的storm流處理引擎,也可以在將來支援其它的流處理引擎,如flink。

下面,先介紹一下streamcql中對流和視窗的定義。

圖2:流和視窗的關係

視窗概念的提出,讓流中的資料有了邊界,此時視窗中的資料等同於資料庫中的靜態表,所以可以資料表一樣被執行各種操作和查詢,在後面我們會給出相關的示例。streamcql支援的視窗種類也是比較多的,比如按事件數量、按時間範圍和按自然天的視窗,同時在視窗的移動上,也支援兩種方式,即跳動視窗和滑動視窗,如圖3和圖4所示。可以看出,跳動視窗是在固定的時間間隔下進行視窗的移動,一次性清除視窗中之前的資料,而滑動視窗則是在視窗時鐘的驅動下,視窗隨之滑動,將滑動到視窗之外的資料清除。

圖3:基於跳動視窗的示意圖

圖4:基於滑動視窗的示意圖

由於steamsql採用了類sql的語法,所以在編寫應用時要比使用原生的storm api簡潔的多,圖5是乙個簡單應用下的語法對比。

圖5:streamcql和storm api的語法對比

下面我們再舉一些實際的例子,來說明streamcql的在流處理場景中的實際使用。

第乙個流處理示例比較簡單,該示例會從kafka中讀取資料,然後統計條數,之後將統計結果寫入kafka的另外乙個topic中。

1)  首先建立乙個輸入流s1,使用預設的反序列化方式來解析資料。

create input stream s1  -- 指定輸入流的名稱

(id int, name string, type int)  -- 流中字段型別

source 

kafkainput  -- 指定資料來源及相關的屬性

properties (groupid = "cqlclient", topic = "cqlinput",

zookeepers = "127.0.0.1:2181", brokers = "127.0.0.1:9092" );

2)  然後建立輸出流,使用預設的序列化方式輸出資料。

create output stream s2  -- 指定輸出流的名稱

(cnt long)  -- 流中字段型別

sink kafakoutput  -- 指定資料目的及相關屬性

properties ( topic = "cqlout", zookeepers = "127.0.0.1:2181",

brokers = "127.0.0.1:9092" );

3)  接下來,執行從輸入流轉化到輸出流的streamcql語句,這個例子中只是統計事件的條數。

insert into stream s2 select count(*) from s1;
4)  最後,將上述語句構成的streamcql應用程式進行提交,並指定這個應用程式名稱為cql_kafka_example。一旦提交,系統就開始進行應用程式的解析和發布。

有了上面的基礎,我們可以看乙個更複雜點的例子,這個例子中會將2個輸入流(s1、s2)的資料進行合併,並標識**,然後將合併後的流與另乙個輸入流(s3)進行視窗的join操作,最終生成輸出流(rs):

1)  建立輸入流s1、s2和s3。

create input stream s1 (...) source kafkainput properties(...);

create input stream s2...;

create input stream s3...;

2)  建立輸出流rs。

create output stream rs(...) sink kafkaoutput properties(...)

3)  將s1和s2中的元組插入到臨時流s4中,並新增常量作為**判斷字段,1表示來自s1,2表示來自s2。

insert into s4 select *,1 from s1;

insert into s4 select *,2 from s2;

4)  將s4和s3的視窗資料經過內連線(inner join)後插入到輸出流中,具體的,s4的視窗是記錄數為10的跳動視窗,s3的視窗是3小時的滑動視窗。內連線的條件是s4的id欄位等於s3的type欄位,而且s4的id欄位要大於5。

insert into stream rs

select * from s4[rows 10 batch] 

inner join s3[range 3 hours slide]

on s4.id=s3.type

where s4.id > 5;

上面這個示例應用程式提交後生成的storm拓撲如圖6所示。

圖6:streamcql生成的join功能的storm拓撲

我們再看最後乙個示例,和上面流的連線相反,這是個流拆分的例子,即從1個輸入流拆分出3個輸出流。為了簡單起見,我們只給出關鍵部分的語句:

from teststream  -- 指定輸入流

insert into stream s1 select *  -- 輸入流所有欄位都插入到輸出流s1中

insert into stream s2 select a  -- 輸入流中字段a插入到輸出流s2中

insert into stream s3 select id, name where id > 10  --輸入流中字段id、name插入到輸出流s3中,且id必須大於10

prarllel 4;  -- 輸入運算元的併發數量

這個示例最終生成的storm拓撲如圖7所示:

圖7:streamcql生成的split功能的storm拓撲

上述就是streamcql的基本原理、架構和示例,簡而言之,它為流式應用提供了非常便捷的類sql的開發語言,你可以用它很方便的開發相關應用,完成基於流的各種查詢統計。最後,streamsql開源後也有了一些新的規劃方向,包括模式匹配、cqlserver、可靠性增強等。

SQL大資料優化上

考慮到處理資料一定要結合資料庫自身實際,此篇就先從常用的輔助效能分析工具說起,sql server profiler,效能計數器,執行計畫,sql prompt等工具,也許平時都正在應用,下邊結合自身使用進行介紹。sql server profiler sql server提供的監控工具,在執行命令...

主流大資料SQL引擎技術博弈,誰為王?

文章講的是主流大資料sql引擎技術博弈,誰為王,近日,atscale公布了第四季度主流大資料sql引擎的測試結果,主要針對spark impala hive tez以及presto。測試結果證實了我們早已實踐出的一些事情 impala是中等大小資料庫查詢的最佳選擇,並且已經積累了不少使用者,pres...

大資料架構中的流式架構和Kappa架構

1.流式架構 流式架構在大資料中應用十分廣泛,在傳統大資料架構的基礎上,流式架構非常激進,直接取消了批處理操作,資料全程以資料流的方式進行處理,所以在資料接入端沒有了etl操作,轉而替換為資料通道。而流式架構的優點十分明顯,流式架構的優點就是沒有十分麻煩的etl過程,資料的實效性非常高。當然,流式架...