SparkSql實現Mysql到hive的資料流動

2022-03-28 08:01:23 字數 2971 閱讀 5053

今天去面試了一波,因為排程系統採用了sparksql實現資料從mysql到hive,在這一點上面試官很明顯很不滿我對於spark的理解,19年的第乙個面試就這麼掛了。

有問題不怕,怕的是知道了問題還得過且過。現在就來梳理下我的專案是怎麼使用spark導數的

第一步:把mysql中的表放入記憶體

properties.put("user", dbuser);

properties.put("password", dbpassword);

properties.put("driver", dbdriver);

datasetbizdateds = sparksession.read().jdbc(

dburl,

dbtablename,

properties

);

其中:org.apache.spark.sql.dataset(這裡面試官問我怎麼把mysql的資料轉化到spark,我沒答上來)

第二步:建立資料庫與表

2.1 建立庫

string createdbsql = "create database if not exists " + hivedbname + " location '" + dbpath + "'";

sparksession.sql(createdbsql);

```

2.2建立表

分成兩步,第一步讀取mysql元資料字段,第二步把這些字段建立出來

2.2.1 讀取mysql欄位

structtype structtype = bizdateds.schema();

structfield structfields = structtype.fields();

/*structfield是structtype中的字段。

param:name此字段的名稱。

param:datatype此字段的資料型別。

param:nullable指示此字段的值是否為空值。

param:metadata此字段的元資料。 如果未修改列的內容(例如,在選擇中),則應在轉換期間保留元資料。

*/

2.2.2  建立字段
string sourcetype; //name of the type used in json serialization.

string columnname;

string targettype;

structfield structfield;

sparkdatatypeenum sparkdatatype;

stringbuilder createbuilder = new stringbuilder(capacity);

listdbtablecolumns = lists.newarraylist();

mapdbtablecolumntypemap = maps.newhashmap();

//把mysql中的每個欄位都提取出來

for (int i = 0, len = structfields.length; i < len; i++)

sparkdatatype = sparkdatatypeenum.getitembytype(sourcetype);

if (null != sparkdatatype) else

dbtablecolumns.add(columnname);

dbtablecolumntypemap.put(columnname, targettype);

if (i != 0)

}sparksession.sql(createtablesql);

2.3 對比字段

我們在2.2中,如果hive有字段了,那麼就不會建立表。

問題在於,如果hive中的字段比mysql中的少怎麼辦?

2.3.1 獲取hive中的表字段

hiveutil connectiontohive = new hiveutil("org.apache.hive.jdbc.hivedriver", hiveurl, hiveuser, hivepassword);

public listgettablecolumns(string dbname,string tablename) throws sqlexception

databasemetadata metadata = connection.getmetadata();

rs = metadata.getcolumns(null, dbname, tablename.touppercase(), "%");

listcolumns = new arraylist();

while (rs.next())

return columns;

} catch (sqlexception e) finally

}}

2.3.2 對比字段並且新增:
for (string dbtablecolumn : dbtablecolumns) 

if (!hivetablecolumns.contains(dbtablecolumn))

}

2.4 將記憶體中的表存入hive
bizdateds.createorreplacetempview(tmptablename); //注意這裡不是直接從mysql抽到hive,而是先從mysql抽到記憶體中

insert hive_table select hive中的已經有的表的字段 from tmptablename

##很明顯的,如果不是需要和hive已經有的表互動根本用不到jdbc

sparksql實現單詞計數

1 建立sparksession val sparksession sparksession.builder master local 2 getorcreate 2 載入資料,使用dataset處理資料集 read來讀取可以直接返回dataset string 這是個比rdd更高階的資料集 它返回...

SparkSQL之關聯mysql和hive查詢

create database spark use spark create table dept deptno int 2 primary key,dname varchar 14 loc varchar 13 insert into dept value 10,accounting new yo...

SparkSQL的3種Join實現

大家知道,在資料庫的常見模型中 比如星型模型或者雪花模型 表一般分為兩種 事實表和維度表。維度表一般指固定的 變動較少的表,例如聯絡人 物品種類等,一般資料有限。而事實表一般記錄流水,比如銷售清單等,通常隨著時間的增長不斷膨脹。因為join操作是對兩個表中key值相同的記錄進行連線,在sparksq...