Python 分布式程序

2022-03-17 07:27:54 字數 4728 閱讀 4783

分布式程序是將process程序分布到多台伺服器中,利用多台機器的效能完成複雜的任務。可以應用到分布式爬蟲的開發中。

分布式程序在python中依然要用到multiprocess模組。它不但支援多程序,其中managers子模組還支援吧多程序分不到多台機器上,可以寫乙個服務程序作為排程者,將任務分不到其他多個程序中,依靠網路通訊進行管理。

例:分布式:

主要問題就是講queue暴露在網路中,讓其他機器的程序都可以訪問,分布式程序就是將這一過程進行了封裝,可以將之成為本佇列的網路化。

建立分布式程序需要的六個步驟:

1、建立佇列queue,用來程序之間的通訊:

服務程序建立任務佇列task_queue,用來作為傳遞惹怒給任務程序的通道。在分布式多程序環境下必須通過由queuemanager

獲得queue介面來新增任務。

2、把第一步中建立的佇列在網路上註冊,暴露給其他程序(主機),註冊後獲得網路佇列,相當於本地佇列的映像。

3、建立乙個物件(queuemanager(basemanage))例項manager,繫結埠和驗證口令。

4、啟動第三部中建立的例項,即啟動管理manager,監管通訊通道。

5、通過管理例項的方法獲得通過網路訪問queue物件,即再把網路佇列例項化成可已使用的本地佇列。

6、建立人物到"本地"佇列中,自動上傳人物到網路佇列中,分配給任務程序進行處理。

python版本:3.5

系統:mac os

taskmanager.py

#

!/usr/bin/env python

#-*- coding: utf-8 -*-

__author__ = '

fade zhao'#

服務程序(taskmanager.py)

#匯入隨機,時間,佇列

from multiprocessing.managers import

basemanager

import

time,random,queue

##實現第一步:建立task_queue和result_queue,用來存放任務和結果

task_queue =queue.queue()

result_queue =queue.queue()

class

queuemanager(basemanager):

pass

#實現第二步:把建立的兩個佇列註冊在網路上,利用register方法,callable引數關聯了queue物件,

#將queue物件在網路中暴露

queuemanager.register('

get_task_queue

',callable=lambda

:task_queue)

queuemanager.register(

'get_result_queue

',callable=lambda

:result_queue)

#實現第三步:繫結埠8001,設定驗證口令b『abc』(是b格式)。這個相當於物件的初始化

manager=queuemanager(address=('

127.0.0.1

',1234),authkey=b'

abc',)#

實現第四步:啟動管理,監聽資訊通道

manager.start()

#實現第五步:通過管理例項的方法獲得通過網路訪問的queue物件

task=manager.get_task_queue()

result=manager.get_result_queue()

#實現第六步:新增任務

for url in ["

imageurl_

"+str(i) for i in range(10)]:

print('

put task %s ...

' %url)

task.put(url)

#獲取返回結果

print('

try get result...')

for i in range(10):

print('

result is %s

' %result.get(timeout=10))

#關閉管理

manager.shutdown()

taskworker.py

#

!/usr/bin/env python

#-*- coding: utf-8 -*-

__author__ = '

fade zhao

'import

time

from multiprocessing.managers import

basemanager

#建立類似的queuemanager:

class

queuemanager(basemanager):

pass

#實現第一步:使用queuemanager註冊獲取queue的方法名稱

queuemanager.register('

get_task_queue')

queuemanager.register(

'get_result_queue')

#實現第二步:連線到伺服器:

server_addr = '

127.0.0.1

'print('

connect to server %s...

' %server_addr)

#埠和驗證口令注意保持與服務程序設定的完全一致:

m = queuemanager(address=(server_addr, 1234), authkey=b'

abc')#

從網路連線:

m.connect()

#實現第三步:獲取queue的物件:

task =m.get_task_queue()

result =m.get_result_queue()

#實現第四步:從task佇列取任務,並把結果寫入result佇列:

while(not

task.empty()):

image_url = task.get(true,timeout=5)

print('

run task download %s...

' %image_url)

time.sleep(1)

result.put(

'%s--->success

'%image_url)

結果:

taskmanager>>>connect to server 127.0.0.1...

run task download imageurl_0...

run task download imageurl_1...

run task download imageurl_2...

run task download imageurl_3...

run task download imageurl_4...

run task download imageurl_5...

run task download imageurl_6...

run task download imageurl_7...

run task download imageurl_8...

run task download imageurl_9...

taskworker>>>connect to server 127.0.0.1...

run task download imageurl_0...

run task download imageurl_1...

run task download imageurl_2...

run task download imageurl_3...

run task download imageurl_4...

run task download imageurl_5...

run task download imageurl_6...

run task download imageurl_7...

run task download imageurl_8...

run task download imageurl_9...

注意,當我們在一台機器上寫多程序程式時,建立的queue可以直接拿來用,但是,在分布式多程序環境下,新增任務到queue不可以直接對原始的task_queue進行操作,那樣就繞過了queuemanager的封裝,必須通過manager.get_task_queue()獲得的queue介面新增。

這個簡單的manager/worker模型有什麼用?其實這就是乙個簡單但真正的分布式計算,把**稍加改造,啟動多個worker,就可以把任務分布到幾台甚至幾十台機器上,比如換成傳送郵件,就實現了郵件佇列的非同步傳送。

其中的queue是在taskmanage程序中被建立註冊到網路中,而taskworker通過網路獲取queue。

authkey 為了保證兩台機器正常通訊,不被其他機器惡意干擾。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定連線不上。

python 分布式程序

process可以分布到多台機器上,而thread最多只能分布到同一臺機器的多個cpu上。python的multiprocessing模組不但支援多程序,其中managers子模組還支援把多程序分布到多台機器上。乙個服務程序可以作為排程者,將任務分布到其他多個程序中,依靠網路通訊。由於manager...

python 學習 分布式程序

伺服器端 import random,time,queue from multiprocessing.managers import basemanager 傳送任務的佇列 task queue queue.queue 接收結果的佇列 result queue queue.queue class q...

Python之 分布式程序

要實現上面的功能,建立分布式程序需要分為六個步驟 接下來通過程式實現上面的例子 linux版本 首先編寫的是服務程序。如下 coding utf 8 author liuyazhuang date 2018 10 14 10 18 description 分布式服務程序linux版 version ...