Python自定義主從分布式架構例項分析

2022-10-04 21:30:50 字數 3339 閱讀 6842

環境:win7 x64,python 2.7,apscheduler 2.1.2。

原理圖如下:

**部分:

(1)、中心節點:

#encoding=utf-8

#author: walker

#date: 2014-12-03

#function: 中心節點(主要功能是分配任務)

import socketserver, socket, queue

centerip = '127.0.0.1' #中心節點ip

centerlistenport = 9999 #中心節點監聽埠

centerclient = socket.socket(socket.af_inet, socket.sock_dgram) #中心節點用於傳送網路訊息的socket

taskqueue = queue.queue() #任務佇列

#獲取任務佇列

def gettaskqueue():

for i in range(1, 11):

taskqueue.put(str(i))

#centerserver的**函式,在接受到udp報文是觸發

class myudphandler(socketserver.baserequesthandler):

def handle(self):

data = self.request[0].strip()

socket = self.request[1www.cppcns.com]

print(data)

if data.startswith('wait'):

vec = data.split(':')

if len(vec) != 3:

print('error: len(vec) != 3')

else:

nodeip = vec[1]

nodelistenport = vec[2]

nodeid = nodeip + ':' + nodelistenport

if not taskqueue.empty():

task = taskqueue.get()

程式設計客棧 print('send task ' + task + ' to ' + nodeid)

centerclient.sendto('task:' + task, (nodeip, int(nodelistenport)))

else:

print('taskqueue is e')

gettaskqueue() #獲取任務佇列

centerserver = socketserver.udpserver((centerip, centerlistenport), myudphandler)

print('listen port ' + str(centerlistenport) + ' ...')

centerserver.serve_forever()

(2)、任務節點:

#encoding=utf-8

#author: walker

#date: 2014-12-03

#function: 任務節點(請求/接收/執行任務)

import time, socket, socketserver

from apscheduler.scheduler import scheduler

centerip = '127.0.0.1' #中心節點ip

cen程式設計客棧terlistenport = 9999 #中心節點監聽埠

nodeip = socket.gethostbyname(socket.gethostname()) #任務節點自身ip

nodeclient = socket.socket(socket.af_inet, sock程式設計客棧et.sock_dgram) #任務節點用於傳送網路訊息的socket

#任務:傳送網路資訊

def jobsendnetmsg():

msg = ''

if nodeserver.taskstate == 'wait':

msg = 'wait:' + nodeip + ':' + str(nodelistenport)

elif nodeserver.taskstate == 'exec':

msg = 'exec:' + nodeip + ':' + str(nodelistenport)

print(msg)

nodeclient.sendto(msg, (centerip, centerlistenport))

#新增並啟動定時任務

def inittimer():

sched = scheduler()

sched.add_interval_job(jobsendnetmsg, seconds=1)

sched.start()

#執行任務

def exectask(task):

print('exectask ' + task + ' ...')

time.sleep(2)

print('exectask ' + task + ' over')

#nodeserver的**函式,在接受到udp報文是觸發

class myudphandler(socketserver.baserequesthandler):

def handle(self):

data = self.request[0].strip()

socket = self.request[1]

print('recv data: ' + data)

if data.startswith('task'):

vec = data.split(':')

if len(vec) != 2:

print('error: len(vec) != 2')

else:

task = vec[1]

self.server.taskstate = 'exec'

exectask(task)

self.server.taskstate = 'wait'

inittimer()

nodeserver = socketserver.udpserver(('', 0), myudphandler)

nodeserver.taskstate = 'wait' #(exec/wait)

nodelistenport = nodeserver.server_address[1]

print('nodelistenport:' + str(nodelistenport))

nodeserver.serve_forever()

python分布式架構 分布式架構

1.分布式架構 採用centos mongodb windows2012 python redis進行分布式架構搭建,mongodb的框架最核心的設計就是 mongodb和mapreduce。mongodb為海量的資料提供了儲存,則mapreduce為海量的資料提供了計算,windows2012作為...

python 分布式程序

process可以分布到多台機器上,而thread最多只能分布到同一臺機器的多個cpu上。python的multiprocessing模組不但支援多程序,其中managers子模組還支援把多程序分布到多台機器上。乙個服務程序可以作為排程者,將任務分布到其他多個程序中,依靠網路通訊。由於manager...

Python 分布式程序

分布式程序是將process程序分布到多台伺服器中,利用多台機器的效能完成複雜的任務。可以應用到分布式爬蟲的開發中。分布式程序在python中依然要用到multiprocess模組。它不但支援多程序,其中managers子模組還支援吧多程序分不到多台機器上,可以寫乙個服務程序作為排程者,將任務分不到...