ceph中的SafeTimer類詳解

2021-09-25 09:19:06 字數 4828 閱讀 6801

safetimer的使用

ceph中的事件都是整合自context類,事件處理方法是finish(),這是乙個純虛函式,由不同的事件完成自己的處理方法。

ceph的定時器原理是開啟執行緒,迴圈檢查是否有事件需要執行,如果沒有就掛起,等待觸發條件到來。主要用於集群中的某些特定任務,比如心跳機制,心跳機制就是在finish()中每隔一段事件ping一次。

safetimer中是迴圈檢查需要執行的事件,所以維護了兩個map,乙個是包含時間和事件資訊的schedule,其中的事件按照時間排序,safetimer會檢視schedule中的第乙個任務是否到期,如果到了就呼叫callback函式執行,如果沒有就直接退出迴圈,是任務按時間執行的依據;另乙個是events,主要用來檢查任務是否正確新增或取消。

名稱型別

keyvalue

schedule

multimap

utime_t

context

events

mapcontext

schedule 的迭代器

該類是從thread類繼承,擁有乙個safetimer類成員變數和entry()成員函式,safetimer中的成員thread為此型別,承擔了定時器迴圈檢查是否有事件觸發需要執行相應程式的任務。

class

safetimerthread

:public thread

void

*entry

() override

};

safetimer類中有若干成員函式,包括

名稱作用

引數init()

定時器初始化

none

shutdown()

關閉定時器

none

timer_thread()

定時器的迴圈執行緒

none

add_event_after()

一段時間後新增事件

seconds ,callback

add_event_at()

在某個時間點新增事件

when, callback

cancel_event()

取消某個事件

callback

cancel_all_events()

取消所有事件

none

dump()

caller

init()

初始化並建立定時器的執行緒

void safetimer::

init()

shutdown()

關閉定時器,join()函式應該會使得日誌中先顯示timer_thread awake然後才是timer_thread exiting

void safetimer::

shutdown()

}

為什麼執行緒都結束了還要lock一下?

timer_thread()

這裡是safetimer執行緒執行的核心部分,配合流程圖分析。該執行緒只要定時器不停止就一直存在,迴圈檢查,有事件需要執行就去呼叫相應的callback函式執行;沒有的時候就處於掛起狀態,直達有新的事件新增進來,或者時間到了需要執行任務,該執行緒將會被變數cond的signal喚醒

void safetimer::

timer_thread()

// recheck stopping if we dropped the lockif(

!safe_callbacks && stopping)

break

;ldout

(cct,20)

<<

"timer_thread going to sleep"

<< dendl;

if(schedule.

empty()

) cond.

wait

(lock)

;//這裡執行緒會進入等待佇列,掛起不執行後面的程式?直到由任務新增進來需要執行

else

cond.

waituntil

(lock, schedule.

begin()

->first)

;、而如果是執行事件還未到,那就掛起到時間條件滿足

ldout

(cct,20)

<<

"timer_thread awake"

<< dendl;

}ldout

(cct,10)

<<

"timer_thread exiting"

<< dendl;

lock.

unlock()

;}

add_event_after,add_event_at,cancel_event,cancel_all_events作用就是schedule和events中的事件新增和刪除,保證事件都能夠執行到。

safetimer在ceph中常用於定時的任務,比如monitor節點對集群的監控,osd之間的心跳機制等等。下面從ceph的osd.cc中的原始碼來分析safetimer的使用。

通常safetimer定義之後,會呼叫safetimer的呼叫函式,確定其將響應的事件cct,然後會呼叫init()函式初始化並啟動定時器。如osd.cc中

agent_timer

(osd-

>client_messenger-

>cct, agent_timer_lock);

agent_timer.

init()

;

如果需要持續不斷執行某一任務,那麼就需要不斷地呼叫add_event_after()函式來為schedule新增事件,如

void osdservice::

agent_entry()

uint64_t level = agent_queue.

rbegin()

->first;

set& top = agent_queue.

rbegin()

->second;

dout(10

)<<

__func__

<<

" tiers "

<< agent_queue.

size()

<<

", top is "

<< level

<<

" with pgs "

<< top.

size()

<<

", ops "

<< agent_ops <<

"/"<< cct-

>_conf-

>osd_agent_max_ops

<<

(agent_active ?

" active"

:" not active"

)<< dendl;

dout(20

)<<

__func__

<<

" oids "

<< agent_oids << dendl;

int max = cct-

>_conf-

>osd_agent_max_ops - agent_ops;

int agent_flush_quota = max;if(

!flush_mode_high_count)

agent_flush_quota = cct-

>_conf-

>osd_agent_max_low_ops - agent_ops;

if(agent_flush_quota <=

0|| top.

empty()

||!agent_active)if(

!agent_valid_iterator || agent_queue_pos == top.

end())

pgref pg =

*agent_queue_pos;

dout(10

)<<

"high_count "

<< flush_mode_high_count

<<

" agent_ops "

<< agent_ops

<<

" flush_quota "

<< agent_flush_quota << dendl;

agent_lock.

unlock()

;if(!pg-

>

agent_work

(max, agent_flush_quota)

) agent_lock.

lock()

;}agent_lock.

unlock()

;dout(10

)<<

__func__

<<

" finish"

<< dendl;

}

如果不再需要定時器,那麼直接shutdown()就可以

agent_timer.

shutdown()

;

Ceph中的序列化

yuandong 2015.05.11 作為主要和磁碟 網路打交道的分布式儲存系統,序列化是最基礎的功能之一,今天我們來看一下ceph中序列化的設計與實現。1 ceph序列化的方式 序列化 ceph稱之為encode 的目的是將資料結構表示為二進位製流的方式,以便通過網路傳輸或儲存在磁碟等儲存介質上...

在ceph中 pool PG OSD的關係

pool是儲存物件的邏輯分割槽,它規定了資料冗餘的型別和對應的副本分布策略 支援兩種型別 副本 replicated 和 糾刪碼 erasure code 目前我們公司內部使用的pool都是副本型別 3副本 pg placement group 是乙個放置策略組,它是物件的集合,該集合裡的所有物件都...

部署CEPH 一 (CEPH的環境準備)

實驗拓撲圖 配置ceph節點 二 為node1節點儲存各台主機的金鑰 三 為node1節點生成公鑰及金鑰 四 實現node1節點遠端各台主機免密登陸 包含node1自己 node1操作 五 為node6節點 客戶端 配置時間伺服器 node6操作 六 為node1 5配置為ntp伺服器 node6 ...