本文和大家分享的主要是使用python
實現(xiàn)多線程、進(jìn)程并行模塊相關(guān)內(nèi)容,一起來看看吧,希望對大家
學(xué)習(xí)python有所幫助。
當(dāng)單線程性能不足時,我們通常會使用多線程/
多進(jìn)程去加速運行。而這些代碼往往多得令人絕望,需要考慮:
如何創(chuàng)建線程執(zhí)行的函數(shù)?
如何收集結(jié)果?若希望結(jié)果從子線程返回主線程,則還要使用隊列
如何取消執(zhí)行?
直接kill
掉所有線程?信號如何傳遞?
是否需要線程池?
否則反復(fù)創(chuàng)建線程的成本過高了
不僅如此,若改為多進(jìn)程或協(xié)程,代碼還要繼續(xù)修改。若多處使用并行,則這些代碼還會重復(fù)很多遍,非常痛苦。
于是,我們考慮將并行的所有邏輯封裝到一個模塊之內(nèi),向外部提供像串行執(zhí)行一樣的編程體驗,還能徹底解決上面所述的疑難問題。所有代碼不足180
行。
使用時非常簡潔:
def xprint(x):
time.sleep(1) # mock a long time task
yield x*x
i=0
for item in multi_yield(xrange(100)),xprint, process_mode,3:
i+=1
print(item)
if i>10:
break
上面的代碼會使用三個進(jìn)程,并行地打印1-10
的平方。當(dāng)打印完
10
之后,進(jìn)程自動回收釋放。就像串行程序一樣簡單。
1.
先實現(xiàn)串行任務(wù)
我們通常會將任務(wù)分割為很多個子塊,從而方便并行。因此可以將任務(wù)抽象為生成器。類似下面的操作,每個seed
都是任務(wù)的種子。
def get_generator():
for seed in 100:
yield seed
任務(wù)本身的定義,則可以通過一個接受種子的函數(shù)來實現(xiàn):
def worker(seed):
# some long time task
return seed*seed # just example
那么實現(xiàn)串行任務(wù)就像這樣:
for seed in get_generator(n):
print worker(seed)
進(jìn)一步地,可以將其抽象為下面的函數(shù):
def serial_yield(genenator,worker):
for seed in generator():
yield worker(seed)
該函數(shù)通過傳入生成器函數(shù)(generator
)和任務(wù)的定義
(worker
函數(shù)
)
,即可再返回一個生成器。消費時:
for result in serial_yield(your_genenator, your_worker):
print(result)
我們看到,通過定義高階函數(shù),serial_yield
就像
map
函數(shù),對
seed
進(jìn)行加工后輸出。
2.
定義并行任務(wù)
考慮如下場景: boss
負(fù)責(zé)分發(fā)任務(wù)到任務(wù)隊列,多個
worker
從任務(wù)隊列撈數(shù)據(jù),處理完之后,再寫入結(jié)果隊列。主線程從結(jié)果隊列中取結(jié)果即可。
我們定義如下幾種執(zhí)行模式:
async:
異步
/
多協(xié)程
thread:
多線程
process:
多進(jìn)程
使用Python
創(chuàng)建
worker
的代碼如下
,func
是任務(wù)的定義(是個函數(shù))
def factory(func, args=None, name='task'):
if args is None:
args = ()
if mode == process_mode:
return multiprocessing.Process(name=name, target=func, args=args)
if mode == thread_mode:
import threading
t = threading.Thread(name=name, target=func, args=args)
t.daemon = True
return t
if mode == async_mode:
import gevent
return gevent.spawn(func, *args)
創(chuàng)建隊列的代碼如下,注意seeds
可能是無窮流,因此需要限定隊列的長度,當(dāng)入隊列發(fā)現(xiàn)隊列已滿時,則任務(wù)需要阻塞。
def queue_factory(size):
if mode == process_mode:
return multiprocessing.Queue(size)
elif mode == thread_mode:
return Queue(size)
elif mode == async_mode:
from gevent import queue
return queue.Queue(size)
什么時候任務(wù)可以終止?
我們羅列如下幾種情況:
所有的seed
都已經(jīng)被消費完了
外部傳入了結(jié)束請求
對第一種情況,我們讓boss
在
seed
消費完之后,在隊列里放入多個
Empty
標(biāo)志,
worker
收到
Empty
之后,就會自動退出,下面是
boss
的實現(xiàn)邏輯:
def _boss(task_generator, task_queue, worker_count):
for task in task_generator:
task_queue.put(task)
for i in range(worker_count):
task_queue.put(Empty)
print('worker boss finished')
再定義worker
的邏輯:
def _worker(task_queue, result_queue, gene_func):
import time
try:
while not stop_wrapper.is_stop():
if task_queue.empty():
time.sleep(0.01)
continue
task = task.get()
if task == Empty:
result_queue.put(Empty)
break
if task == Stop:
break
for item in gene_func(task):
result_queue.put(item)
print ('worker worker is stop')
except Exception as e:
logging.exception(e)
print ('worker exception, quit')
簡單吧?但是這樣會有問題,這個后面再說,我們把剩余的代碼寫完。
再定義multi_yield
的主要代碼。 代碼非常好理解,創(chuàng)建任務(wù)和結(jié)果隊列,再創(chuàng)建
boss
和
worker
線程
(
或進(jìn)程
/
協(xié)程
)
并啟動,之后不停地從結(jié)果隊列里取數(shù)據(jù)就可以了。
def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10):
workers = []
result_queue = queue_factory(queue_size)
task_queue = queue_factory(queue_size)
main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss')
for process_id in range(0, worker_count):
name = 'worker_%s' % (process_id)
p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name)
workers.append(p)
main.start()
for r in workers:
r.start()
count = 0
while not should_stop():
data = result_queue.get()
if data is Empty:
count += 1
if count == worker_count:
break
continue
if data is Stop:
break
else:
yield data
這樣從外部消費時,即可:
def xprint(x):
time.sleep(1)
yield x
i=0
for item in multi_yield(xprint, process_mode,3,xrange(100)):
i+=1
print(item)
if i>10:
break
這樣我們就實現(xiàn)了一個與 serial_yield
功能類似的
multi_yield
。可以定義多個
worker
,從隊列中領(lǐng)任務(wù),而不需重復(fù)地創(chuàng)建和銷毀,更不需要線程池。當(dāng)然,代碼不完全,運行時可能出問題。但以上代碼已經(jīng)說明了核心的功能。完整的代碼可以在文末找到。
但是你也會發(fā)現(xiàn)很嚴(yán)重的問題:
當(dāng)從外部break
時,內(nèi)部的線程并不會自動停止
我們無法判斷隊列的長度,若隊列滿,那么put
操作會永遠(yuǎn)卡死在那里,任務(wù)都不會結(jié)束。
3.
改進(jìn)任務(wù)停止邏輯
最開始想到的,是通過在 multi_yield
函數(shù)參數(shù)中添加一個返回
bool
的函數(shù),這樣當(dāng)外部
break
時,同時將該函數(shù)的返回值置為
True
,內(nèi)部檢測到該標(biāo)志位后強制退出。偽代碼如下
:
_stop=False
def can_stop():
return _stop
for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop):
i+=1
print(item)
if i>10:
_stop=True
break
但這樣并不優(yōu)雅,引入了更多的函數(shù)作為參數(shù),還必須手工控制變量值,非常繁瑣。在多進(jìn)程模式下,stop
標(biāo)志位還如何解決?
我們希望外部在循環(huán)時執(zhí)行了break
后,會自動通知內(nèi)部的生成器。實現(xiàn)方法似乎就是
with
語句,即
contextmanager.
我們實現(xiàn)以下的包裝類:
class Yielder(object):
def __init__(self, dispose):
self.dispose = dispose
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self.dispose()
它實現(xiàn)了with
的原語,參數(shù)是
dispose
函數(shù),作用是退出
with
代碼塊后的回收邏輯。
由于值類型的標(biāo)志位無法在多進(jìn)程環(huán)境中傳遞,我們再創(chuàng)建StopWrapper
類,用于管理停止標(biāo)志和回收資源
:
class Stop_Wrapper():
def __init__(self):
self.stop_flag = False
self.workers=[]
def is_stop(self):
return self.stop_flag
def stop(self):
self.stop_flag = True
for process in self.workers:
if isinstance(process,multiprocessing.Process):
process.terminate()
最后的問題是,如何解決隊列滿或空時,put/get
的無限等待問題呢?考慮包裝一下
put/get
:包裝在
while True
之中,每隔兩秒
get/put
,這樣即使阻塞時,也能保證可以檢查退出標(biāo)志位。所有線程在主線程結(jié)束后,最遲也能在
2s
內(nèi)自動退出。
def safe_queue_get(queue, is_stop_func=None, timeout=2):
while True:
if is_stop_func is not None and is_stop_func():
return Stop
try:
data = queue.get(timeout=timeout)
return data
except:
continue
def safe_queue_put(queue, item, is_stop_func=None, timeout=2):
while True:
if is_stop_func is not None and is_stop_func():
return Stop
try:
queue.put(item, timeout=timeout)
return item
except:
continue
如何使用呢?我們只需在multi_yield
的
yield
語句之外加上一行就可以了:
with Yielder(stop_wrapper.stop):
# create queue,boss,worker, then start all
# ignore repeat code
while not should_stop():
data = safe_queue_get(result_queue, should_stop)
if data is Empty:
count += 1
if count == worker_count:
break
continue
if data is Stop:
break
else:
yield data
仔細(xì)閱讀上面的代碼,
外部循環(huán)時退出循環(huán),則會自動觸發(fā)stop_wrapper
的
stop
操作,回收全部資源,而不需通過外部的標(biāo)志位傳遞!這樣調(diào)用方在心智完全不需有額外的負(fù)擔(dān)。
實現(xiàn)生成器和上下文管理器的編程語言,都可以通過上述方式實現(xiàn)自動協(xié)程資源回收。筆者也實現(xiàn)了一個C#
版本的,有興趣歡迎交流。
這樣,我們就能像文章開頭那樣,實現(xiàn)并行的迭代器操作了。
4.
結(jié)語
一些實現(xiàn)的細(xì)節(jié)很有趣,我們借助在函數(shù)中定義函數(shù),可以不用復(fù)雜的類去承擔(dān)職責(zé),而僅僅只需函數(shù)。而類似的思想,在函數(shù)式編程中非常常見。
該工具已經(jīng)被筆者的流式語言 etlpy
所集成。但是依然有較多改進(jìn)的空間,如沒有集成分布式執(zhí)行模式。
來源:博客園