Structured Streaming 輸入輸出

2021-08-02 22:54:04 字數 1987 閱讀 4021

sparksession.readstream()返回乙個datastreamreader介面物件,可以通過該物件對輸入源進行引數配置,最後返回dataframe/dataset物件。

val csvdf = spark

.readstream

.option("sep", ";")

.schema(userschema)

.csv("/path/to/directory")

val inputstream = spark.readstream

.format("kafka")

.option("kafka.bootstrap.servers", "127.0.0.1:9092")

.option("subscribe", "testss")

.load()

val socketdf = spark

.readstream

.format("socket")

.option("host", "localhost")

.option("port", 9999)

.load()

具體輸入配置參考建立

complete模式:每次會把整個result table輸出,所以只支援聚合操作。

update模式:只有更新的資料才會輸出到輸出端(記憶體中維護了上次觸發後的結果)。

不同的流查詢操作支援不同的輸出模式,如下表所示:

查詢型別

支援的模式

原因非聚合操作

updatecomplete模式不支援是因為需要在result table中維護所有資料,這是不太現實的

基於watermark的視窗聚合操作

update

complete其他聚合操作

update

complete

writestream

.format("parquet") // can be "orc", "json", "csv", etc.

.option("path", "path/to/destination/dir")

.start()

writestream

.foreach(...)

.start()

writestream

.format("console")

.start()

writestream

.format("memory")

.queryname("tablename")

.start()

val query = wordcounts.writestream.trigger(processingtime(5.seconds))

.outputmode("complete")

.foreach(new foreachwriter[row]

override def

close

(errorornull: throwable): unit =

override def

open

(partitionid: long, version: long): boolean = "))

filewriter = new filewriter(new file(s"/tmp/example/$/temp"))

true

}}).start()

Structured Streaming 開發入門

structured streaming 作為 spark 家族的新成員,通過 spark sql dataframe 來處理 batch streaming 資料,基本的 sparksql api 即可實現離線處理和流式處理,大大的方便了流式計算的開發,另外還提供了豐富的功能。structured...

Linux shell shell的輸入與輸出

大多數使用標準輸入的命令都指定乙個檔案作為標準輸入 1.echo echo hello word 將輸出hello word 如果想把hello word輸出到檔案中中 使用重定向符號 下面命令將helloword字元寫入myfile檔案中 echo hello word myfile 2.read...

CPrimerPlus學習(十三) 檔案輸入輸出

程式清單13.1 count.c程式 count.c 使用標準 i o include include 提供 exit 的原型 intmain int argc,char ar if fp fopen ar 1 r null while ch getc fp eof fclose fp printf...