智慧型交通指標分析 top5車輛高速通過的卡口

2021-10-06 22:59:13 字數 3692 閱讀 9811

按卡口分組,獲取不同車速型別通過卡口的車輛數

val sql = "select * from traffic.monitor_flow_action"

val df = spark.sql(sql)

implicit val monitorflowactionencoder: encoder[monitorflowaction] =             expressionencoder()

implicit val tupleencoder: encoder[tuple2[string, monitorflowaction]] = expressionencoder()

implicit val stringencoder: encoder[string] = expressionencoder()

implicit val speedsortkeyencoder: encoder[speedsortkey] = expressionencoder()

implicit val stringandspeedsortkeyencoder: encoder[tuple2[string, speedsortkey]] = expressionencoder()

val ds = df.as[monitorflowaction].map(action => (action.monitor_id, action))

ds.cache()

val ds1 = ds.groupbykey(tuple => tuple._1)

.mapgroups((i,x) => else if (speed >= 60 && speed < 90) else if (speed >= 90 && speed < 120) else if (speed >= 120)

}(monitorid, speedsortkey(lowspeed, normalspeed, mediumspeed, highspeed))

})按車速型別排序,獲取top5車輛高速通過的卡口

獲取top5卡口對應的監控記錄

//將top5卡口生成廣播變數

val broads = spark.sparkcontext.broadcast(list)

//從原始的監控資料中過濾出top5卡口資料

val ds2 = ds.filter(x => )

ds2.map(_._2).createorreplacetempview("top10_speed_tmp")

獲取top5卡口中每個卡口top10高速通過的車輛記錄

val df2 = spark.sql("select " + args(0) + " as task_id, date,monitor_id,camera_id,car,action_time,speed,road_id,area_id from (select *, row_number() over(partition by monitor_id order by speed desc) rank from top10_speed_tmp) t where t.rank<=10")

使用sql方式

select *,

if(speed>120,1,0) as highflag,

if(speed<60,1,0) as lowerflag,

#case when speed>120 then 1 else 0 end as highflag,

#case when speed<60 then 1 else 0 end as lowerflag,

from monitor_flow_action;

tmp1:  

*,highflag:1,middelflag:0,normalflag:0,lowerflag:0

*,highflag:0,middelflag:1,normalflag:0,lowerflag:0

****************************************====

select monitor_id,sum(highflag) highcnt,sum(middelflag) middelcnt,.... 

group by monitor_id

from tmp1 

order by highcnt desc,middelcnt desc,...

limit 5

// 遍歷速度,判斷並統計四個值

while (x.hasnext) else if (speed >= 60 && speed < 90) else if (speed >= 90 && speed < 120) else if (speed >= 120)

}(monitorid, speedsortkey(lowspeed, normalspeed, mediumspeed, highspeed))

})import spark.implicits._

val arr = ds1.sort($"_2".desc).take(5)

val list = arr.map(item => item._1)

val monitorids = list.map(item => highspeed(args(0), item))

val broads = spark.sparkcontext.broadcast(list)

val ds2 = ds.filter(x => )

ds2.map(_._2).createorreplacetempview("top10_speed_tmp")

val df2 = spark.sql("select " + args(0) + " as task_id, date,monitor_id,camera_id,car,action_time,speed,road_id,area_id from (select *, row_number() over(partition by monitor_id order by speed desc) rank from top10_speed_tmp) t where t.rank<=10")

spark.close()}}

//自定義排序

case class speedsortkey(lowspeed:int, normalspeed:int, mediumspeed:int, highspeed:int) extends ordered[speedsortkey] with serializableelse if (this.mediumspeed - that.mediumspeed != 0) else if (this.normalspeed - that.normalspeed != 0) else if (this.lowspeed - that.lowspeed != 0) 0}

override def tostring:string = "speedsortkey [lowspeed=" + lowspeed + ", normalspeed=" + normalspeed + ", mediumspeed=" + mediumspeed + ", highspeed=" + highspeed + "]"

}case class highspeed(task_id: string, monitor_id: string)

case class monitorflowaction(date:string, monitor_id:string, camera_id:string,car:string,action_time:string,speed:string,road_id:string,area_id:string)

普通指標到智慧型指標的轉換

普通指標到智慧型指標的轉換 int iptr new int 42 shared ptr int p iptr 智慧型指標到普通指標的轉換 int pi p.get 注意的地方 那就是不要將智慧型指標與普通指標混用。如果專案允許,堅持使用智慧型指標,避免原生指標。智慧型指標與普通指標需要特別特別特別...

C 智慧型指標和普通指標引數的使用問題

char str char pvargtocompletionroutine string ss str 記憶體洩漏 給乙個物件申請一塊記憶體空間,由於某種原因這塊記憶體未釋放掉,這塊記憶體被占用導致應用卡頓等。記憶體溢位 擁有一塊20位元組的記憶體空間,你將30位元組的檔案寫入其中,就會造成溢位。...

智慧型交通監控

智慧型交通監控 一.功能模組 高畫質交通攝像機,高畫質卡口解決方案,闖紅燈抓拍系統,平安城市建設解決方案,智慧型城市監控系統,抓拍車牌攝像機,停車場車牌識別系統,高速公路雷達測速,車牌識別攝像機,智慧型交通高畫質攝像機,電警攝像機,闖紅燈電警攝像機,治安卡口攝像機,道閘監控攝像機,收費站攝像機,雷達...