redis stream中pending資料的處理

2021-10-10 11:43:40 字數 3060 閱讀 9181

在消費者組模式下,當乙個訊息被消費者取出,為了解決組內訊息讀取但處理期間消費者崩潰帶來的訊息丟失問題,stream 設計了 pending 列表,用於記錄讀(xreadgroup)取但並未處理完畢(未ack)的訊息。

下面的討論基於幾點:

面向的場景為多個無差別消費者(每個消費者名子相同,功能相同)在同一group下消費任務。

我們要保證的是,每個任務至多只做一次。

**實現是在使用redis stream實現佇列服務一文的封裝基礎上實現的。

如果你的處理邏輯是:

gettask()

deltask()

yourprocessfuc();

即不太關注任務的丟失,此時無需做什麼特別處理。但一定記得deltask(),不然pending佇列會越積越多,占用大量儲存空間。

/*

* 將pending佇列中超時的資料重新放回佇列

* * $idletime: 超時時間, 毫秒

* $perpage:每次從pending佇列中取的任務數, 之所以分頁是為防止佇列太長,一下取出記憶體不夠

** 注意:只能有乙個程序執行pendingrestore

** 優點: consumer不需要做任何改動

* 缺點:

* 先del再add, 成本上不划算,

* 如果del和add中間斷掉任務就丟了

* 無法保留任務被重複投遞的次數,不過如果你的任務只想重做一次,或者不關注此資料那就無所謂了。

* * return: restore的數量

* */

public function pendingrestore($idletime = 5000, $perpage = 20)

}$restorenum += $thisnum;

if ($thisnum < $perpage)

}return $restorenum;

}/* 從pending佇列中取任務

*/protected function getpending($count = 1, $start='-', $end='+', $consumer = null)

return $this->_mredis->xpending($this->_mstream, $this->_mgroup, $start, $end, $count, $consumer);}/*

* 取[$start, $end]範圍內的資料, 注意是閉區間

** $count:條數,null時表示取全部

* */

protected function getrange($start = '-', $end = '+', $count = null)else

}

將超時任務放入另乙個名子的消費者pending佇列中,然後從新的消費者歷史資料中取出資料並處理。

/*

* 另一種恢復超時任務的方法

* 思路:將超時任務放入newconsumer的pending中,後續可以從newconsume的歷史中取出資料並處理

** 優點:

* 恢復資料沒有重複讀,刪,插,效率高

* 任務投遞次數會保留在新的pending中

** 缺點:

* consumer需要做改動,至少要改變consumer的名子

* 只能用單程序從歷史資料中讀資料,然後處理。**

* $idletime: 超時時間, 毫秒

* $newconsumer: 之後處理pending任務的消費者名稱

* $perpage: 每次取pending任務的條數

** return: 滿足條件且成功claim的條數

* */

public function pendingclaim($idletime = 5000, $newconsumer=null, $perpage = 20)

$info = $this->getpendinginfo();

$startid = $info[1];

$endid = $info[2];

$claimnum = 0;

/** 使用startid, endid遍歷pending列表

* 因為getpending取的是[startid, endid]

* 所以邊界處的id可能被重複取出,但不影響結果的正確性

* perpage越大/符合xclaim條件的id越多,重複的可能性越小

* */

while($startid != $endid)

//xclaim會根據條件自動過濾任務

$res = $this->_mredis->xclaim($this->_mstream, $this->_mgroup, $newconsumer, $idletime, $ids, ['justid']);

$thisnum = count($res);

$claimnum += $thisnum;

//id是按時間排列,小id未超時,則後面不會超時

//在所有id都有相同的投遞次數的基礎上

if ($thisnum < $perpage)

}return $claimnum;

}

使用pendingclaim後,可以使用乙個單獨程序,通過下面方式獲取超時任務並處理。

$config = [

'server' => '10.10.10.1:6379',

'stream' => 'balltube',

'consumer' => 'pendingprocessor',//pendingclaim中的newconsumer

];$q = new redisqueue($this->_config);

$block = 1000;

$num = 1;

while(1)

$id = key($d);

$data = $d[$id];

$q->deltask($id);

//處理任務邏輯

yourtaskprocessfunc($data);

}

c 畫筆Pen畫虛線

using system using system.collections.generic using system.componentmodel using system.data using system.drawing using system.text using system.window...

c 畫筆Pen畫虛線

using system using system.collections.generic using system.componentmodel using system.data using system.drawing using system.text using system.window...

c 畫筆Pen繪製曲線

using system using system.collections.generic using system.componentmodel using system.data using system.drawing using system.text using system.window...