多執行緒的 pipeline 設計模式

2021-08-29 13:23:25 字數 3386 閱讀 4402

乙個簡單例子:

有很多個html網頁,網頁的id、title、url、path等資訊存在乙個資料庫表中,網頁內容儲存在乙個磁碟陣列上。現在要把所有網頁都讀出來,統計其中的html標籤、正文等資訊,並寫入另乙個資料庫表,怎樣的設計最好呢?

一般的想法是使用多個平行的執行緒,每個執行緒處理某個id範圍的網頁。但是仔細分析就可以發現,對每個網頁的處理可以分為以下處理步驟:

讀取資料庫行

讀取檔案內容

解析html,生成統計資料

將統計結果寫入資料庫

這幾個處理步驟有各自的特徵,讀取資料庫的時間一般主要消耗在資料庫伺服器響應,讀取檔案內容一般主要消耗在磁碟io上,解析、統計消耗在計算上,寫統計結果也消耗在資料庫伺服器響應上。如果我們為這幾個過程建立各自的執行緒,每個任務通過訊息佇列來傳遞。就得到如下設計:

在這個設計中,每個處理過程可以根據需要設定不同的執行緒數,這個例子中,資料庫不會是瓶頸,只剩下讀檔案和計算,如果檔案io夠快(如果網頁存在不同的陣列上),那麼可以增加計算執行緒(伺服器一般都是多cpu的)來達到平衡。

一些例子或許還會有更多的處理步驟。

可以從中得出乙個設計模式,甚至可以直接寫出實現框架的類:

pipeline.h

/* vim: set tabstop=4 : */

#ifndef

__febird_pipeline_h__

#define

__febird_pipeline_h__

#ifdefined

(_msc_ver)&&

(_msc_ver

>=

1020

)# pragma

once

# pragma

warning

(push

)# pragma

warning

(disable

:4018

)# pragma

warning

(disable

:4267

)#endif

#include

#include

#include

#include

#include

"../thread/concurrentqueue.h"

//#include "../thread/locksentry.h"

//#include "../thread/thread.h"

namespace

febird;

class

pipelinemultitask

:public

pipelinetask;

class

pipelinestep

;class

pipelinethread

;class

pipelineprocessor;

class

pipelinestep

queue_t

*getoutqueue

()const

void

stop

();};

class

funpipelinestep

:public

pipelinestep

{boost

::function3

<

void

,pipelinestep

*,int

,pipelinetask

*&>

m_process

;// take(this, threadno, task)

boost

::function2

<

void

,pipelinestep

*,int

>

m_setup

;// take(this, threadno)

boost

::function2

<

void

,pipelinestep

*,int

>

m_clean

;// take(this, threadno)

void

process

(int

threadno

,pipelinetask

*&task

);void

setup

(int

threadno

);void

clean

(int

threadno);

void

default_setup

(int

threadno

);void

default_clean

(int

threadno

);static

void

static_default_setup

(pipelinestep

*self

,int

threadno

);static

void

static_default_clean

(pipelinestep

*self

,int

threadno);

public

:funpipelinestep

(int

thread_count

,const

boost

::function3

<

void

,pipelinestep

*,int

,pipelinetask

*&>&

fprocess

,const

boost

::function2

<

void

,pipelinestep

*,int

>&

fsetup

,const

boost

::function2

<

void

,pipelinestep

*,int

>&

fclean

);funpipelinestep

(int

thread_count

,const

boost

::function3

<

void

,pipelinestep

*,int

,pipelinetask

*&>&

fprocess

,const

std::

string

&step_name=""

多執行緒設計模式 Master Worker模式

master worker是常用的平行計算模式。它的核心思想是系統由兩類程序協作工作 master程序和worker程序。master負責接收和分配任務,worker負責處理子任務。當各個worker子程序處理完成後,會將結果返回給master,由master作歸納總結。其好處就是能將乙個大任務分解...

玩轉併發 多執行緒Per Message設計模式

thread per message的意思是為每乙個訊息的出來開闢乙個執行緒使得訊息能夠以併發的方式進行處理,從而提高整個系統的吞吐量。這就好比 接線員,收到的每乙個 都會交由乙個工作人員處理。定義message類 public class message public intgetvalue pu...

python之多執行緒threading模組的使用

import threading print threading.active count 返回當前存活的執行緒數量 輸出1 print threading.enumerate 返回當前所有存活的執行緒列表 輸出 mainthread started print threading.current ...