超時流式處理 沒有訊息流入的資料異常監控

2021-09-22 17:19:36 字數 4315 閱讀 7455

create table tbl (  

id int8,

-- ..... 其他字段 (比如已完結狀態)

state int, -- 完結狀態(1 表示已完結)

deadts timestamp, -- 超時時間

nts interval, -- 超時間隔,用於更新下一次通知時間 (比如一天通知一次)

notify_times int default 0, -- 通知次數

deadts_next timestamp -- 下一次通知時間

);

create index idx_tbl_1 on tbl (deadts,notify_times,state) where notify_times=0 and state<>1;  

create index idx_tbl_2 on tbl (deadts_next,state) where deadts_next is not null and state<>1;

with tmp1 as (  

update tbl set

deadts_next=now()+nts,

notify_times=notify_times+1

where ctid = any (array(

select ctid from tbl where

( deadts < now() and notify_times=0 and state<>1)

union all

select ctid from tbl where

( deadts_next < now() and deadts_next is not null and state<>1)

limit 10000 -- 一次獲取1萬條超時資料

))

returning *

) select * from tmp1;

cte scan on tmp1  (cost=18163.25..18163.45 rows=10 width=48)

cte tmp1

-> update on tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54)

initplan 1 (returns $0)

-> limit (cost=0.13..18151.03 rows=10000 width=6)

-> index scan using idx_tbl_1 on tbl (cost=0.13..169527.13 rows=369766 width=6)

index cond: (deadts < now())

-> index scan using idx_tbl_2 on tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6)

index cond: (deadts_next < now())

-> tid scan on tbl tbl_2 (cost=0.01..12.21 rows=10 width=54)

tid cond: (ctid = any ($0))

(12 rows)

-- 1億條完結  

insert into tbl select id, 1, now(), '5 min', 0, null from generate_series(1,100000000) t(id);

-- 100萬條超時

insert into tbl select id, 0, now(), '5 min', 0, null from generate_series(1,1000000) t(id);

with tmp1 as (  

update tbl set

deadts_next=now()+nts,

notify_times=notify_times+1

where ctid = any (array(

select ctid from tbl where

( deadts < now() and notify_times=0 and state<>1)

union all

select ctid from tbl where

( deadts_next < now() and deadts_next is not null and state<>1)

limit 10000 -- 一次獲取1萬條超時資料

))

returning *

) select * from tmp1;

-- 計畫

cte scan on tmp1 (cost=18163.25..18163.45 rows=10 width=48) (actual time=39.092..78.707 rows=10000 loops=1)

output: tmp1.id, tmp1.state, tmp1.deadts, tmp1.nts, tmp1.notify_times, tmp1.deadts_next

buffers: shared hit=75094 read=49 dirtied=49

cte tmp1

-> update on public.tbl tbl_2 (cost=18151.05..18163.25 rows=10 width=54) (actual time=39.089..74.637 rows=10000 loops=1)

output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, tbl_2.notify_times, tbl_2.deadts_next

buffers: shared hit=75094 read=49 dirtied=49

initplan 1 (returns $0)

-> limit (cost=0.13..18151.03 rows=10000 width=6) (actual time=31.265..36.899 rows=10000 loops=1)

output: tbl.ctid

buffers: shared hit=11395

buffers: shared hit=11395

-> index scan using idx_tbl_1 on public.tbl (cost=0.13..169527.13 rows=369766 width=6) (actual time=0.014..0.014 rows=0 loops=1)

output: tbl.ctid

index cond: (tbl.deadts < now())

buffers: shared hit=1

-> index scan using idx_tbl_2 on public.tbl tbl_1 (cost=0.43..590959.46 rows=51535 width=6) (actual time=31.249..33.870 rows=10000 loops=1)

output: tbl_1.ctid

index cond: (tbl_1.deadts_next < now())

buffers: shared hit=11394

-> tid scan on public.tbl tbl_2 (cost=0.01..12.21 rows=10 width=54) (actual time=39.017..43.529 rows=10000 loops=1)

output: tbl_2.id, tbl_2.state, tbl_2.deadts, tbl_2.nts, (tbl_2.notify_times + 1), (now() + tbl_2.nts), tbl_2.ctid

tid cond: (tbl_2.ctid = any ($0))

buffers: shared hit=21395

planning time: 0.301 ms

execution time: 79.905 ms

time: 79.905 ms

基於spark的流式資料處理 DStream概述

spark streaming工作機制 spark streaming程式的基本步驟 建立streamingcontext物件 spark streaming工作機制 在spark streaming中,會有乙個元件receiver,作為乙個長期執行的task跑在乙個executor上 每個rece...

訊息佇列 怎麼處理訊息佇列中重複的資料

在分布式系統中,上游發來的訊息佇列必然會存在重複的資料,這是不可避免的。producer在傳送訊息時比如遇到網路問題時,傳送後因超時得不到伺服器的ack,從而進行重發。如果傳送的訊息內容是銀行扣款,那麼發生的問題可想而知。有人會問為啥中介軟體不能幫我們做排重?中介軟體排重會有以下問題 名稱質量標準 ...

資料庫連線池的超時處理

資料庫連線池,能夠減少頻繁的建立,達到節約資源的效果,但是當連線遠端資料庫的時候,會出現資料庫連線池快速耗盡的問題,如果在資料連線池裡對超時的資料連線進行監視,超過時間就關閉,這樣往往會導致資料庫的異常,只能夠在呼叫遠端的資料連線的實際操作程式中去設定.private void updatestat...