DataWorks支援PyODPS型別任務

2021-08-19 23:38:42 字數 2446 閱讀 2253

摘要: 昨天,dataworks推出了pyodps任務型別,整合了maxcompute的python sdk,可在dataworks的pyodps節點上直接編輯python**操作maxcompute,也可以設定排程任務來處理資料,提高資料開發效率。

昨天,dataworks推出了pyodps任務型別,整合了maxcompute的python sdk,可在dataworks的pyodps節點上直接編輯python**操作maxcompute,也可以設定排程任務來處理資料,提高資料開發效率。

效果如下圖

只有華東2(上海)region 支援了 pyodps 節點。

注:底層的 python 版本為 2.7 。

新建 pyodps 節點具體操作如下:

1) 單擊資料開發頁面工具欄中的 新建 > 新建任務。2) 填寫新建任務彈出框中的各配置項。

3) 單擊建立

dataworks 的 pyodps 節點中,將會包含乙個全域性的變數 odps 或者 o ,即 odps 入口。使用者不需要手動定義 odps 入口。

print(odps.exist_table('pyodps_iris'))
pyodps支援odps sql的查詢,並可以讀取執行的結果。 execute_sql 或者 run_sql 方法的返回值是 執行例項 。

註解:並非所有在 odps console 中可以執行的命令都是 odps 可以接受的 sql 語句。 在呼叫非 ddl / dml 語句時,請使用其他方法,例如 grant / revoke 等語句請使用 run_security_query 方法,pai 命令請使用 run_xflow 或 execute_xflow 方法。

>>> o.execute_sql('select * from dual')  #  同步的方式執行,會阻塞直到sql執行完成

>>>

>>> instance = o.run_sql('select * from dual') # 非同步的方式執行

>>> print(instance.get_logview_address()) # 獲取logview位址

>>> instance.wait_for_success() # 阻塞直到完成

有時,我們在執行時,需要設定執行時引數,我們可以通過設定 hints 引數,引數型別是dict。

我們可以對於全域性配置設定sql.settings後,每次執行時則都會新增相關的執行時引數。

>>> from odps import options

>>> o.execute_sql('select * from pyodps_iris') # 會根據全域性配置新增hints

執行 sql 的 instance 能夠直接執行 open_reader 的操作,一種情況是sql返回了結構化的資料。

>>> 

with o.execute_sql('select * from dual').open_reader() as reader:

>>>

for record in reader:

>>>

# 處理每乙個record

另一種情況是 sql 可能執行的比如 desc,這時通過 reader.raw 屬性取到原始的sql執行結果。

>>> 

with o.execute_sql('desc dual').open_reader() as reader:

>>> print(reader.raw)

pyodps節點使用排程引數需要注意一下,系統定義的排程引數,可以直接通過此方法獲取。

自定義引數的使用,需要使用單獨的方法獲取。

在全域性包括乙個 args 物件,可以在這個中獲取,它是乙個dict型別。

測試執行結果如下:

請注意:在資料開發下,使用了自定義排程引數,頁面上直接觸發執行pyodps節點時,需要寫死時間,pyodps節點無法像sql一樣直接替換。

排程請參考:

DataWorks支援PyODPS型別任務

昨天,dataworks推出了pyodps任務型別,整合了maxcompute的python sdk,可在dataworks的pyodps節點上直接編輯python 操作maxcompute,也可以設定排程任務來處理資料,提高資料開發效率。效果如下圖 只有華東2 上海 region 支援了 pyod...

python mysql資料庫連線 pyodbc

import pyodbc cnxn pyodbc.connect driver server database uid pwd cursor cnxn.cursor cursor.execute select id from datatable row cursor.fetchone 其中 其中 ...

DataWorks新手引導 持續更新

a dataworks還未採用這種授權方式哈!dataworks給子賬號使用的流程是 主賬號建立專案 主賬號新建子賬號 將子賬號加入專案並賦予角色 子賬號登入及更新個人資訊 注 主賬號新建子賬號時,建立ak這一步的時候,ak一定要儲存好,不然子賬號在更新個人資訊的時候,還需要主賬號重新去建立一次ak...