優雅的停止SparkStreaming

2021-10-08 16:03:30 字數 1602 閱讀 6995

背景:流式任務需要7*24小時執行,但是有時涉及到公升級**需要主動停止程式,但是分布式程式,沒辦法做到乙個個程序去殺死,所有配置優雅的關閉就顯得至關重要了。可以考慮使用外部檔案儲存或者關係型資料庫、快取等來控制內部程式關閉。

此例子使用hdfs建立指定檔案來控制程式的關閉,想要更好的在前端進行控制,可以在註冊程式中修改標誌源。

package com.smothclose

import org.apache.hadoop.conf.configuration

import org.apache.hadoop.fs.

import org.apache.spark.streaming.

class monitorstop(ssc: streamingcontext) extends runnable

val state: streamingcontextstate = ssc.getstate

val bool: boolean = fs.exists(new path("/stopsparktest"))

if (bool) }}

}}

package com.smothclose

import org.apache.log4j.

import org.apache.spark.sparkconf

import org.apache.spark.streaming.dstream.

import org.apache.spark.streaming.

object sparktest

// 如果為true,spark會streamingcontext在jvm關閉時正常關閉,而不是立即關閉。

sparkconf.set("spark.streaming.stopgracefullyonshutdown", "true")

val ssc = new streamingcontext(sparkconf, seconds(5))

ssc.checkpoint("./ck1111")

val line: receiverinputdstream[string] = ssc.sockettextstream("node01", 9999)

val word: dstream[string] = line.flatmap(_.split(" "))

val wordandone: dstream[(string, int)] = word.map((_, 1))

val wordandcount: dstream[(string, int)] = wordandone.updatestatebykey(update)

wordandcount.print()

ssc} def main(args: array[string]): unit =

}

會有乙個常駐執行緒來監控標誌位檔案是否存在,若存在檢查該ssc(streamingcontext)的狀態是否為活躍,若是活躍狀態進行停職該執行緒。

注:需要開啟優雅關閉配置(預設是false)

優雅的停止執行緒

thread類的start 方法啟動多執行緒,thread原本也有提供有停止方法stop 但從1.2開始已經廢除了,因為這種方法可能造成執行緒的死鎖,現在實現執行緒的停止需要通過一種柔和的方式進行。範例 實現執行緒柔和的停止 package multithreading public class s...

如何優雅的停止sparkstreaming程式

直接kill 9?不好吧,萬一我這個程式還在處理資料呢?還沒處理完呢?在處理的資料丟失了呢?但是我又想讓它先停一下呢?好了,直接上 吧 語言組織不好 import org.apache.hadoop.conf.configuration import org.apache.hadoop.fs.imp...

執行緒停止繼續 如何優雅的停止乙個執行緒?

在之前的文章中 i code.online 併發程式設計 執行緒基礎 我們介紹了執行緒的建立和終止,從原始碼的角度去理解了其中的細節,那麼現在如果面試有人問你 如何優雅的停止乙個執行緒?你該如何去回答尼 能不能完美的回答尼?這裡有個思考 當處於sleep時,執行緒能否感受到中斷訊號?對於執行緒的停止...