利用Kettle進行資料同步(下)

2021-09-13 17:02:32 字數 4481 閱讀 2188

技術匯(id: jishuhui_2015)可聯絡到作者。

上篇介紹了基於kettle的資料同步工程的搭建,entrypoint.kjb就是整個工程執行的入口。

為了進一步降低操作成本,讓整個資料同步過程更穩定、安全,需要進行更高層面的抽象,做成乙個簡單易用的系統。

以下是應用截圖:

除了選擇資料來源和資料庫之外,還加入了授權碼,意味著授權範圍內的使用者才能使用該系統。

因為是內部使用,授權使用者還沒實現後台管理,直接往應用資料庫裡新增,所選擇的資料來源和資料庫都是通過配置檔案生成的。

資料庫名稱:kettle,目前有兩張表:

1、授權使用者表。表內記錄的使用者即可使用資料同步系統

create table `authorized_user` (

`id` int(11) not null auto_increment comment '使用者id,自增',

`user` varchar(128) not null comment '使用者名稱,全域性唯一',

`token` varchar(20) not null comment '使用者的授權碼,全域性唯一',

`status` char(1) not null default 'a' comment '授權使用者狀態:a-已授權,r-未授權',

`gmt_create` datetime not null comment '建立時間',

`gmt_modify` datetime not null comment '最後修改時間',

primary key (`id`),

unique key `unique_index_token` (`token`) using btree,

unique key `unique_index_user` (`user`) using btree

) engine=innodb auto_increment=1 default charset=utf8 comment='授權使用者表'

2、同步記錄表。記錄使用者的資料同步操作

create table `sync_record` (

`sync` varchar(20) not null comment '同步記錄主鍵',

`ipv4` varchar(15) not null comment 'ip位址',

`from_db` varchar(100) not null comment '源資料',

`to_db` varchar(100) not null comment '目標資料',

`user` varchar(128) not null comment '使用者名稱',

`token` varchar(20) not null comment '使用者的授權碼',

`status` char(1) not null default 'p' comment '同步狀態:p-正在執行,s-成功,f-失敗',

`gmt_create` datetime not null comment '同步建立時間',

`gmt_modify` datetime not null comment '最後修改時間',

primary key (`sync`)

) engine=innodb default charset=utf8 comment='同步記錄表';

因為系統做得比較簡單實用,沒有什麼特別設計之處。筆者重點說三點:

1、資料來源及其引數配置

env:

entry-point: kettle/entrypoint.kjb

databases:

- taxi-user

- taxi-account

- taxi-trade

- taxi-coupon

- taxi-bi

- taxi-system

- taxi-credits

- taxi-finance

- taxi-notification

- taxi-gateway

from-dbs:

- prod

- test

- local

to-dbs:

- local

- test

db-settings:

- name: local

host: *****

port: 3306

user: *****

password: *****

- name: test

host: *****

port: 3306

user: *****

password: *****

- name: prod

host: *****

port: 3306

user: *****

password: *****

利用了springboot的@configurationproperties的註解。

@setter

@getter

@configurationproperties(prefix = "env")

public class envconfig

}

當中的dbsetting的定義如下所示:

@setter

@getter

@noargsconstructor

public class dbsetting

}

通過客戶端傳來的引數,即可定位到對應的引數設定。

2、整合kettle的api

nexus

公司內部的nexus的url

*,!pentaho-releases

其中的mirrorof節點加上了!pentaho-releases,表示排除pentaho-releases。

然後,在springboot工程中的pom.xml中指定pentaho-releases的url。

pentaho-releases

接下來是核心的對接**,具體可以參照工程原始碼。

jobmeta jobmeta = getjobmeta(new classpathresource(envconfig.getentrypoint()));

job job = new job(null, jobmeta);

//設定variable

job.setvariable("sync", sync);

job.setvariable("to_host", todbsetting.gethost());

job.setvariable("to_db", form.getdb());

job.setvariable("to_user", todbsetting.getuser());

job.setvariable("to_password", todbsetting.getpassword());

job.setvariable("to_port", todbsetting.getport());

job.setvariable("from_host", fromdbsetting.gethost());

job.setvariable("from_db", form.getdb());

job.setvariable("from_user", fromdbsetting.getuser());

job.setvariable("from_password", fromdbsetting.getpassword());

job.setvariable("from_port", fromdbsetting.getport());

job.start(); //開始執行job

job.waituntilfinished(); //等待job完成

3、非同步執行作業因為乙個job的執行時間可能會很長,這個主要是看資料量的多少,所以乙個request的來回可能會導致timeout,所以需要改為非同步的模式。

其核心的思想是:啟動新的執行緒,客戶端定時輪詢執行結果。

筆者分兩篇文章介紹了如何利用kettle進行資料同步,並實現乙個簡易的系統,降低操作成本和出錯率。

歡迎fork我的工程**(

如何進行資料同步

建立資料庫的鏈結 create database link dblink test connect to 需要鏈結的資料庫的名字 identified by 密碼 using 這個鏈結的別名 建立物化檢視用於同步資料 create materialized view test test是同步過來的實...

rsync sersync進行資料同步備份

sersync功能多 支援配置檔案管理 真正的守護程序 可以對失敗檔案定時重傳 第三方的http介面 預設多執行緒 cdn更新 sercver 192.168.138.131 配置與之前的一樣 client 192.168.138.132 網查參考了很多人家的部落格,然後總結寫出來的 客戶端的配置 ...

利用oradata進行資料恢復

之前因為現場對方工程師的原因,把oracle其他資料夾都給格了,只剩下oradata資料夾拷貝出來了,基於oracle9i版本 第一步 搭建乙個和之前一模一樣的資料庫系統環境 第三步 將oradata資料夾覆蓋過去 第四步 啟動服務,登入之後應該會發現普通使用者無法登入,會報錯ora 01033 o...