本文和大家分享的主要是使用python
實現(xiàn)分布式任務(wù)相關(guān)內(nèi)容,一起來看看吧,希望對大家
學(xué)習(xí)python有所幫助。
深入讀了讀python
的官方文檔,發(fā)覺
Python
自帶的
multiprocessing
模塊有很多預(yù)制的接口可以方便的實現(xiàn)多個主機之間的通訊,進而實現(xiàn)典型的生產(chǎn)者
-
消費者模式的分布式任務(wù)架構(gòu)。
之前,為了在Python
中實現(xiàn)生產(chǎn)者
-
消費者模式,往往就會選擇一個額外的隊列系統(tǒng),比如
rabbitMQ
之類。此外,你有可能還要設(shè)計一套任務(wù)對象的序列化方式以便塞入隊列。如果沒有隊列的支持,那不排除有些同學(xué)不得不從
socket
服務(wù)器做起,直接跟
TCP/IP
打起交道來。
其實multiprocessing.managers
中有個
BaseManager
就為開發(fā)者提供了這樣一個快速接口。
我們假定的場景是1
個生產(chǎn)者(
producer.py
)
+8
個消費者
(worker.py)
的系統(tǒng),還有一個中央節(jié)點負責(zé)協(xié)調(diào)(
server.py
)實現(xiàn)如下:
server.py
from multiprocessing.managers
import BaseManager
import Queue
queue = Queue.Queue() #
初始化一個
Q
,用于消息傳遞
class
QueueManager(BaseManager):
pass
QueueManager.register('get_queue', callable=
lambda:queue) #
在系統(tǒng)中發(fā)布
get_queue
這個業(yè)務(wù)
if __name__ == '__main__':
m = QueueManager(address=('10.239.85.193', 50000),authkey='abr' )
#
監(jiān)聽所有
10.239.85.193
的
50000
口
s = m.get_server()
s.serve_forever()
worker.py
from multiprocessing.managers
import BaseManager
from multiprocessing
import Pool
class
QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
def
feb(i): #
經(jīng)典的
'
山羊增殖
'
if i < 2:
return 1
if i < 5 :
return feb(i-1) + feb(i-2)
return feb(i-1) + feb(i-2) - feb(i-5)
def
worker(i):
m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')#
連接
server
m.connect()
while
True:
queue = m.get_queue()#
獲取
Q
c = queue.get()
print feb(c)
if __name__ == '__main__':
p = Pool(8) #
分進程啟動
8
個
worker
p.map(worker, range(8))
producer.py
from multiprocessing.managers
import BaseManager
class
QueueManager(BaseManager):
pass
QueueManager.register('get_queue')
if __name__ == '__main__':
m = QueueManager(address=('10.239.85.193', 50000), authkey='abr')
m.connect()
i = 0
while
True:
queue = m.get_queue()
queue.put(48)
i+=1
系統(tǒng)會直接將Queue()
對象中的數(shù)據(jù)直接封裝后通過
TCP 50000
端口在主機之間傳遞。不過需要注意的是,由于
authkey
的緣故,各個節(jié)點要求
python
的版本一致。
來源:
開源小站