99热99这里只有精品6国产,亚洲中文字幕在线天天更新,在线观看亚洲精品国产福利片 ,久久久久综合网

歡迎加入QQ討論群258996829
麥子學(xué)院 頭像
蘋果6袋
6
麥子學(xué)院

使用python如何對(duì)分布式系統(tǒng)進(jìn)行協(xié)調(diào)?

發(fā)布時(shí)間:2016-11-02 22:26  回復(fù):0  查看:2635   最后回復(fù):2016-11-02 22:26  

隨著大數(shù)據(jù)時(shí)代的到來(lái),分布式是解決大數(shù)據(jù)問(wèn)題的一個(gè)主要手段,隨著越來(lái)越多的分布式的服務(wù),如何在分布式的系統(tǒng)中對(duì)這些服務(wù)做協(xié)調(diào)變成了一個(gè)很棘手的問(wèn)題。今天我們就來(lái)看看如何使用Python語(yǔ)言,利用開源對(duì)分布式服務(wù)做協(xié)調(diào)。

  在對(duì)分布式的應(yīng)用做協(xié)調(diào)的時(shí)候,主要會(huì)碰到以下的應(yīng)用場(chǎng)景:

· 業(yè)務(wù)發(fā)現(xiàn)(service discovery)找到分布式系統(tǒng)中存在那些可用的服務(wù)和節(jié)點(diǎn)

· 名字服務(wù) (name service)通過(guò)給定的名字知道到對(duì)應(yīng)的資源

· 配置管理 (configuration management)如何在分布式的節(jié)點(diǎn)中共享配置文件,保證一致性。

· 故障發(fā)現(xiàn)和故障轉(zhuǎn)移 (failure detection and failover)當(dāng)某一個(gè)節(jié)點(diǎn)出故障的時(shí)候,如何檢測(cè)到并通知其它節(jié)點(diǎn), 或者把想用的服務(wù)轉(zhuǎn)移到其它的可用節(jié)點(diǎn)

· 領(lǐng)導(dǎo)選舉(leader election)如何在眾多的節(jié)點(diǎn)中選舉一個(gè)領(lǐng)導(dǎo)者,來(lái)協(xié)調(diào)所有的節(jié)點(diǎn)

· 分布式的鎖 (distributed exclusive lock)如何通過(guò)鎖在分布式的服務(wù)中進(jìn)行同步

· 消息和通知服務(wù) (message queue and notification)如何在分布式的服務(wù)中傳遞消息,以通知的形式對(duì)事件作出主動(dòng)的響應(yīng)

  有許多的開源軟件試圖解決以上的全部或者部分問(wèn)題,例如ZooKeeper,consuldoozerd等等,我們現(xiàn)在就看看它們是如何做的。

ZooKeeper

ZooKeeper 是使用最廣泛,也是最有名的解決分布式服務(wù)的協(xié)調(diào)問(wèn)題的開源軟件了,它最早和Hadoop一起開發(fā),后來(lái)成為了Apache的頂級(jí)項(xiàng)目,很多開源的項(xiàng)目都在使用ZooKeeper,例如大名鼎鼎的Kafka。

Zookeeper本身是一個(gè)分布式的應(yīng)用,通過(guò)對(duì)共享的數(shù)據(jù)的管理來(lái)實(shí)現(xiàn)對(duì)分布式應(yīng)用的協(xié)調(diào)。

ZooKeeper使用一個(gè)樹形目錄作為數(shù)據(jù)模型,這個(gè)目錄和文件目錄類似,目錄上的每一個(gè)節(jié)點(diǎn)被稱作ZNodes。

使用python如何對(duì)分布式系統(tǒng)進(jìn)行協(xié)調(diào)?

ZooKeeper提供基本的API來(lái)操縱和控制Znodes,包括對(duì)節(jié)點(diǎn)的創(chuàng)建,刪除,設(shè)置和獲取數(shù)據(jù),獲得子節(jié)點(diǎn)等。

  除了這些基本的操作,ZooKeeper還提供了一些配方(Recipe),其實(shí)就是一些常見(jiàn)的用例,例如鎖,兩階段提交,領(lǐng)導(dǎo)選舉等等。

ZooKeeper本身是用Java開發(fā)的,所以對(duì)Java的支持是最自然的。它同時(shí)還提供了C語(yǔ)言的綁定。

Kazoo 是一個(gè)非常成熟的Zookeeper Python客戶端,我們這就看看如果使用Python來(lái)調(diào)用ZooKeeper。(注意,運(yùn)行以下的例子,需要在本地啟動(dòng)ZooKeeper的服務(wù))

基本操作

  以下的例子現(xiàn)實(shí)了對(duì)Znode的基本操作,首先要?jiǎng)?chuàng)建一個(gè)客戶端的連接,并啟動(dòng)客戶端。然后我們可以利用該客戶端對(duì)Znode做增刪改,取內(nèi)容的操作。最后推出客戶端。

from kazoo.client import KazooClient

import logging

logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')

zk.start()

# Ensure a path, create if necessary

zk.ensure_path("/test/zk1")

# Create a node with data

zk.create("/test/zk1/node", b"a test value")

# Determine if a node existsif zk.exists("/test/zk1"):

print "the node exist"

# Print the version of a node and its data

data, stat = zk.get("/test/zk1")print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

# List the children

children = zk.get_children("/test/zk1")print("There are %s children with names %s" % (len(children), children))

zk.stop()

  通過(guò)對(duì)ZNode的操作,我們可以完成一些分布式服務(wù)協(xié)調(diào)的基本需求,包括名字服務(wù),配置服務(wù),分組等等。

故障檢測(cè)(Failure Detection)

  在分布式系統(tǒng)中,一個(gè)最基本的需求就是當(dāng)某一個(gè)服務(wù)出問(wèn)題的時(shí)候,能夠通知其它的節(jié)點(diǎn)或者某個(gè)管理節(jié)點(diǎn)。

ZooKeeper提供ephemeral Node的概念,當(dāng)創(chuàng)建該Node的服務(wù)退出或者異常中止的時(shí)候,該Node會(huì)被刪除,所以我們就可以利用這種行為來(lái)監(jiān)控服務(wù)運(yùn)行狀態(tài)。

  以下是worker的代碼

from kazoo.client import KazooClientimport time

import logging

logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')

zk.start()

# Ensure a path, create if necessary

zk.ensure_path("/test/failure_detection")

# Create a node with data

zk.create("/test/failure_detection/worker",

value=b"a test value", ephemeral=True)

while True:

print "I am alive!"

time.sleep(3)

zk.stop()

  以下的monitor 代碼,監(jiān)控worker服務(wù)是否運(yùn)行。

from kazoo.client import KazooClient

import time

import logging

logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')

zk.start()

# Determine if a node existswhile True:

if zk.exists("/test/failure_detection/worker"):

print "the worker is alive!"

else:

print "the worker is dead!"

break

time.sleep(3)

zk.stop()

領(lǐng)導(dǎo)選舉

Kazoo直接提供了領(lǐng)導(dǎo)選舉的API,使用起來(lái)非常方便。

from kazoo.client import KazooClientimport timeimport uuid

import logging

logging.basicConfig()

my_id = uuid.uuid4()

def leader_func():

print "I am the leader {}".format(str(my_id))

while True:

print "{} is working! ".format(str(my_id))

time.sleep(3)

zk = KazooClient(hosts='127.0.0.1:2181')

zk.start()

election = zk.Election("/electionpath")

# blocks until the election is won, then calls# leader_func()

election.run(leader_func)

zk.stop()

  你可以同時(shí)運(yùn)行多個(gè)worker,其中一個(gè)會(huì)獲得Leader,當(dāng)你殺死當(dāng)前的leader后,會(huì)有一個(gè)新的leader被選出。

分布式鎖

  鎖的概念大家都熟悉,當(dāng)我們希望某一件事在同一時(shí)間只有一個(gè)服務(wù)在做,或者某一個(gè)資源在同一時(shí)間只有一個(gè)服務(wù)能訪問(wèn),這個(gè)時(shí)候,我們就需要用到鎖。

from kazoo.client import KazooClientimport timeimport uuid

import logging

logging.basicConfig()

my_id = uuid.uuid4()

def work():

print "{} is working! ".format(str(my_id))

zk = KazooClient(hosts='127.0.0.1:2181')

zk.start()

lock = zk.Lock("/lockpath", str(my_id))

print "I am {}".format(str(my_id))

while True:

with lock:

work()

time.sleep(3)

zk.stop()

  當(dāng)你運(yùn)行多個(gè)worker的時(shí)候,不同的worker會(huì)試圖獲取同一個(gè)鎖,然而只有一個(gè)worker會(huì)工作,其它的worker必須等待獲得鎖后才能執(zhí)行。

監(jiān)視

ZooKeeper提供了監(jiān)視(Watch)的功能,當(dāng)節(jié)點(diǎn)的數(shù)據(jù)被修改的時(shí)候,監(jiān)控的function會(huì)被調(diào)用。我們可以利用這一點(diǎn)進(jìn)行配置文件的同步,發(fā)消息,或其他需要通知的功能。

from kazoo.client import KazooClientimport time

import logging

logging.basicConfig()

zk = KazooClient(hosts='127.0.0.1:2181')

zk.start()

@zk.DataWatch('/path/to/watch')def my_func(data, stat):

if data:

print "Data is %s" % data

print "Version is %s" % stat.version

else :

print "data is not available"

while True:

time.sleep(10)

zk.stop()

  除了我們上面列舉的內(nèi)容外,Kazoo還提供了許多其他的功能,例如:計(jì)數(shù),租約,隊(duì)列等等。

Consul

Consul 是用Go開發(fā)的分布式服務(wù)協(xié)調(diào)管理的工具,它提供了服務(wù)發(fā)現(xiàn),健康檢查,KeyValue存儲(chǔ)等功能,并且支持跨數(shù)據(jù)中心的功能。

Consul提供ZooKeeper類似的功能,它的基于HTTPAPI可以方便的和各種語(yǔ)言進(jìn)行綁定。自然 Python 也在列。

  與Zookeeper有所差異的是Consul通過(guò)基于ClientServer架構(gòu)的Agent部署來(lái)支持跨Data Center的功能。


使用python如何對(duì)分布式系統(tǒng)進(jìn)行協(xié)調(diào)?

ConsulCluster傷的每一個(gè)節(jié)點(diǎn)都運(yùn)行一個(gè)Agent,這個(gè)Agent可以使Server或者Client模式。Client負(fù)責(zé)到Server的高效通信,相對(duì)為無(wú)狀態(tài)的。Server負(fù)責(zé)包括選舉領(lǐng)導(dǎo)節(jié)點(diǎn),維護(hù)cluster的狀態(tài),對(duì)所有的查詢做響應(yīng),跨數(shù)據(jù)中心的通信等等。

KV基本操作

  類似于ZookeeperConsul支持對(duì)KV的增刪查改的操作。

import consul

c = consul.Consul()

set data for key fooc.kv.put('foo', 'bar')

# poll a key for updatesindex = Nonewhile True:

index, data = c.kv.get('foo', index=index)

print data['Value']

c.kv.delete('foo')

  這里和ZooKeeper對(duì)Znode的操作幾乎是一樣的。

服務(wù)發(fā)現(xiàn)(Service Discovery)和健康檢查(Health Check

Consul的另一個(gè)主要的功能是用于對(duì)分布式的服務(wù)做管理,用戶可以注冊(cè)一個(gè)服務(wù),同時(shí)還提供對(duì)服務(wù)做健康檢測(cè)的功能。

  首先,用戶需要定義一個(gè)服務(wù)。

{

"service": {

"name": "redis",

"tags": ["master"],

"address": "127.0.0.1",

"port": 8000,

"checks": [

{

"script": "/usr/local/bin/check_redis.py",

"interval": "10s"

}

]

}}

  其中,服務(wù)的名字是必須的,其它的字段可以自選,包括了服務(wù)的地址,端口,相應(yīng)的健康檢查的腳本。當(dāng)用戶注冊(cè)了一個(gè)服務(wù)后,就可以通過(guò)Consul來(lái)查詢?cè)摲?wù),獲得該服務(wù)的狀態(tài)。

Consul支持三種Check的模式:

· 調(diào)用一個(gè)外部腳本(Script),在該模式下,consul定時(shí)會(huì)調(diào)用一個(gè)外部腳本,通過(guò)腳本的返回內(nèi)容獲得對(duì)應(yīng)服務(wù)的健康狀態(tài)。

· 調(diào)用HTTP,在該模式下,consul定時(shí)會(huì)調(diào)用一個(gè)HTTP請(qǐng)求,返回2XX,則為健康;429 Too many request)是警告。其它均為不健康

· 主動(dòng)上報(bào),在該模式下,服務(wù)需要主動(dòng)調(diào)用一個(gè)consul提供的HTTP PUT請(qǐng)求,上報(bào)健康狀態(tài)。

Python API提供對(duì)應(yīng)的接口,大家可以參考 http://python-consul.readthedocs.org/en/latest/

· Consul.Agent.Service

· Consul.Agent.Check

ConsulHealth CheckZookeeperFailure Detection略有不同,ZooKeeper可以利用ephemeral Node來(lái)檢測(cè)服務(wù)的狀態(tài),ConsulHealth Check,通過(guò)調(diào)用腳本,HTTP或者主動(dòng)上報(bào)的方式檢查服務(wù)的狀態(tài),更為靈活,可以獲得等多的信息,但是也需要做更多的工作。

故障檢測(cè)(Failure Detection)

Consul提供Session的概念,利用Session可以檢查服務(wù)是否存活。

  對(duì)每一個(gè)服務(wù)我們都可以創(chuàng)建一個(gè)session對(duì)象,注意這里我們?cè)O(shè)置了ttl,consul會(huì)以ttl的數(shù)值為間隔時(shí)間,持續(xù)的對(duì)session的存活做檢查。對(duì)應(yīng)的在服務(wù)中,我們需要持續(xù)的renew session,保證session是合法的。

import consulimport time

c = consul.Consul()

s = c.session.create(name="worker",behavior='delete',ttl=10)

print "session id is {}".format(s)

while True:

c.session.renew(s)

print "I am alive ..."

time.sleep(3)

Moniter代碼用于監(jiān)控worker相關(guān)聯(lián)的session的狀態(tài),但發(fā)現(xiàn)worker session已經(jīng)不存在了,就做出響應(yīng)的處理。

import consulimport time

def is_session_exist(name, sessions):

for s in sessions:

if s['Name'] == name:

return True

return False

c = consul.Consul()

while True:

index, sessions = c.session.list()

if is_session_exist('worker', sessions):

print "worker is alive ..."

else:

print 'worker is dead!'

break

time.sleep(3)

  這里注意,因?yàn)槭腔?/span>ttl(最小10秒)的檢測(cè),從業(yè)務(wù)中斷到被檢測(cè)到,至少有10秒的時(shí)延,對(duì)應(yīng)需要實(shí)時(shí)響應(yīng)的情景,并不適用。Zookeeper使用ephemeral Node的方式時(shí)延相對(duì)短一點(diǎn),但也非實(shí)時(shí)。

領(lǐng)導(dǎo)選舉和分布式的鎖

  無(wú)論是Consul本身還是Python客戶端,都不直接提供Leader Election的功能。

  當(dāng)對(duì)某一個(gè)Keyput操作的時(shí)候,可以創(chuàng)建一個(gè)session對(duì)象,設(shè)置一個(gè)acquire標(biāo)志為該 session,這樣就獲得了一個(gè)鎖,獲得所得客戶則是被選舉的leader。

  代碼如下:

import consulimport time

c = consul.Consul()

def request_lead(namespace, session_id):

lock = c.kv.put(leader_namespace,"leader check", acquire=session_id)

return lock

def release_lead(session_id):

c.session.destroy(session_id)

def whois_lead(namespace):

index,value = c.kv.get(namespace)

session = value.get('Session')

if session is None:

print 'No one is leading, maybe in electing'

else:

index, value = c.session.info(session)

print '{} is leading'.format(value['ID'])

def work_non_block():

print "working"

def work_block():

while True:

print "working"

time.sleep(3)

leader_namespace = 'leader/test'

## initialize leader key/value node

leader_index, leader_node = c.kv.get(leader_namespace)

if leader_node is None:

c.kv.put(leader_namespace,"a leader test")

while True:

whois_lead(leader_namespace)

session_id = c.session.create(ttl=10)

if request_lead(leader_namespace,session_id):

print "I am now the leader"

work_block()

release_lead(session_id)

else:

print "wait leader elected!"

time.sleep(3)

  利用同樣的機(jī)制,可以方便的實(shí)現(xiàn)鎖,信號(hào)量等分布式的同步操作。

監(jiān)視

ConsulAgent提供了Watch的功能,然而Python客戶端并沒(méi)有相應(yīng)的接口。

etcd

etcd 是另一個(gè)用GO開發(fā)的分布式協(xié)調(diào)應(yīng)用,它提供一個(gè)分布式的KeyValue存儲(chǔ)來(lái)進(jìn)行共享的配置管理和服務(wù)發(fā)現(xiàn)。

基本操作

import etcd

client = etcd.Client()

client.write('/nodes/n1', 1)print client.read('/nodes/n1').value

etcd對(duì)節(jié)點(diǎn)的操作和ZooKeeper類似,不過(guò)etcd不支持ZooKeeperephemeral Node的概念,要監(jiān)控服務(wù)的狀態(tài)似乎比較麻煩。

分布式鎖

etcd支持分布式鎖,以下是一個(gè)例子。

import sys

sys.path.append("../../")

import etcd

import uuidimport time

my_id = uuid.uuid4()

def work():

print "I get the lock {}".format(str(my_id))

client = etcd.Client()

lock = etcd.Lock(client, '/customerlock', ttl=60)

with lock as my_lock:

work()

lock.is_locked() # True

lock.renew(60)

lock.is_locked() # False

總結(jié)

ZooKeeper無(wú)疑是分布式協(xié)調(diào)應(yīng)用的最佳選擇,功能全,社區(qū)活躍,用戶群體很大,對(duì)所有典型的用例都有很好的封裝,支持不同語(yǔ)言的綁定。缺點(diǎn)是,整個(gè)應(yīng)用比較重,依賴于Java,不支持跨數(shù)據(jù)中心。

Consul作為使用Go語(yǔ)言開發(fā)的分布式協(xié)調(diào),對(duì)業(yè)務(wù)發(fā)現(xiàn)的管理提供很好的支持,他的HTTP API也能很好的和不同的語(yǔ)言綁定,并支持跨數(shù)據(jù)中心的應(yīng)用。缺點(diǎn)是相對(duì)較新,適合喜歡嘗試新事物的用戶。

etcd是一個(gè)更輕量級(jí)的分布式協(xié)調(diào)的應(yīng)用,提供了基本的功能,更適合一些輕量級(jí)的應(yīng)用來(lái)使用。

 

文章來(lái)源:伯樂(lè)在線

您還未登錄,請(qǐng)先登錄

熱門帖子

最新帖子

?