99热99这里只有精品6国产,亚洲中文字幕在线天天更新,在线观看亚洲精品国产福利片 ,久久久久综合网

歡迎加入QQ討論群258996829
麥子學(xué)院 頭像
蘋果6袋
6
麥子學(xué)院

Python語言中如何實現(xiàn)多線程/進(jìn)程并行模塊?

發(fā)布時間:2017-06-06 22:53  回復(fù):0  查看:2371   最后回復(fù):2017-06-06 22:53  
本文和大家分享的主要是使用python 實現(xiàn)多線程、進(jìn)程并行模塊相關(guān)內(nèi)容,一起來看看吧,希望對大家 學(xué)習(xí)python有所幫助。
當(dāng)單線程性能不足時,我們通常會使用多線程/ 多進(jìn)程去加速運行。而這些代碼往往多得令人絕望,需要考慮:
如何創(chuàng)建線程執(zhí)行的函數(shù)?
如何收集結(jié)果?若希望結(jié)果從子線程返回主線程,則還要使用隊列
如何取消執(zhí)行?  直接kill 掉所有線程?信號如何傳遞?
是否需要線程池?  否則反復(fù)創(chuàng)建線程的成本過高了
不僅如此,若改為多進(jìn)程或協(xié)程,代碼還要繼續(xù)修改。若多處使用并行,則這些代碼還會重復(fù)很多遍,非常痛苦。
于是,我們考慮將并行的所有邏輯封裝到一個模塊之內(nèi),向外部提供像串行執(zhí)行一樣的編程體驗,還能徹底解決上面所述的疑難問題。所有代碼不足180 行。
使用時非常簡潔:
def xprint(x):
    time.sleep(1)  # mock a long time task
    yield x*x   
i=0
for item in multi_yield(xrange(100)),xprint, process_mode,3:
    i+=1
    print(item)
    if i>10:
        break
上面的代碼會使用三個進(jìn)程,并行地打印1-10 的平方。當(dāng)打印完 10 之后,進(jìn)程自動回收釋放。就像串行程序一樣簡單。
1.  先實現(xiàn)串行任務(wù)
我們通常會將任務(wù)分割為很多個子塊,從而方便并行。因此可以將任務(wù)抽象為生成器。類似下面的操作,每個seed 都是任務(wù)的種子。
def get_generator():
    for seed in 100:
        yield seed
任務(wù)本身的定義,則可以通過一個接受種子的函數(shù)來實現(xiàn):
def worker(seed):
    # some long time task
    return seed*seed # just example
那么實現(xiàn)串行任務(wù)就像這樣:
for seed in get_generator(n):
    print worker(seed)
進(jìn)一步地,可以將其抽象為下面的函數(shù):
def serial_yield(genenator,worker):
    for seed in generator():
        yield worker(seed)
該函數(shù)通過傳入生成器函數(shù)(generator )和任務(wù)的定義 (worker 函數(shù) ) ,即可再返回一個生成器。消費時:
for result in serial_yield(your_genenator, your_worker):
    print(result)
我們看到,通過定義高階函數(shù),serial_yield 就像 map 函數(shù),對 seed 進(jìn)行加工后輸出。
2.  定義并行任務(wù)
考慮如下場景: boss 負(fù)責(zé)分發(fā)任務(wù)到任務(wù)隊列,多個 worker 從任務(wù)隊列撈數(shù)據(jù),處理完之后,再寫入結(jié)果隊列。主線程從結(jié)果隊列中取結(jié)果即可。
我們定義如下幾種執(zhí)行模式:
async:  異步 / 多協(xié)程
thread:  多線程
process:  多進(jìn)程
使用Python 創(chuàng)建 worker 的代碼如下 ,func 是任務(wù)的定義(是個函數(shù))
def factory(func, args=None, name='task'):
        if args is None:
            args = ()
        if mode == process_mode:
            return multiprocessing.Process(name=name, target=func, args=args)
        if mode == thread_mode:
            import threading
            t = threading.Thread(name=name, target=func, args=args)
            t.daemon = True
            return t
        if mode == async_mode:
            import gevent
            return gevent.spawn(func, *args)
創(chuàng)建隊列的代碼如下,注意seeds 可能是無窮流,因此需要限定隊列的長度,當(dāng)入隊列發(fā)現(xiàn)隊列已滿時,則任務(wù)需要阻塞。
def queue_factory(size):
        if mode == process_mode:
            return multiprocessing.Queue(size)
        elif mode == thread_mode:
            return Queue(size)
        elif mode == async_mode:
            from gevent import queue
            return queue.Queue(size)
什么時候任務(wù)可以終止?  我們羅列如下幾種情況:
所有的seed 都已經(jīng)被消費完了
外部傳入了結(jié)束請求
對第一種情況,我們讓boss seed 消費完之后,在隊列里放入多個 Empty 標(biāo)志, worker 收到 Empty 之后,就會自動退出,下面是 boss 的實現(xiàn)邏輯:
def _boss(task_generator, task_queue, worker_count):
        for task in task_generator:
            task_queue.put(task)
        for i in range(worker_count):
            task_queue.put(Empty)
        print('worker boss finished')
再定義worker 的邏輯:
def _worker(task_queue, result_queue, gene_func):
        import time
        try:
            while not stop_wrapper.is_stop():
                if task_queue.empty():
                    time.sleep(0.01)
                    continue
                task = task.get()
                if task == Empty:
                    result_queue.put(Empty)
                    break
                if task == Stop:
                    break
                for item in gene_func(task):
                    result_queue.put(item)
            print ('worker worker is stop')
        except Exception as e:
            logging.exception(e)
            print ('worker exception, quit')
簡單吧?但是這樣會有問題,這個后面再說,我們把剩余的代碼寫完。
再定義multi_yield 的主要代碼。 代碼非常好理解,創(chuàng)建任務(wù)和結(jié)果隊列,再創(chuàng)建 boss worker 線程 ( 或進(jìn)程 / 協(xié)程 ) 并啟動,之后不停地從結(jié)果隊列里取數(shù)據(jù)就可以了。
def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10):
        workers = []
        result_queue = queue_factory(queue_size)
        task_queue = queue_factory(queue_size)
        main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss')
        for process_id in range(0, worker_count):
            name = 'worker_%s' % (process_id)
            p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name)
            workers.append(p)
        main.start()
        for r in workers:
            r.start()
        count = 0
        while not should_stop():
            data = result_queue.get()
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data
這樣從外部消費時,即可:
def xprint(x):
    time.sleep(1)
    yield x
i=0
for item in multi_yield(xprint, process_mode,3,xrange(100)):
    i+=1
    print(item)
    if i>10:
        break
這樣我們就實現(xiàn)了一個與 serial_yield  功能類似的  multi_yield  。可以定義多個 worker ,從隊列中領(lǐng)任務(wù),而不需重復(fù)地創(chuàng)建和銷毀,更不需要線程池。當(dāng)然,代碼不完全,運行時可能出問題。但以上代碼已經(jīng)說明了核心的功能。完整的代碼可以在文末找到。
但是你也會發(fā)現(xiàn)很嚴(yán)重的問題:
當(dāng)從外部break 時,內(nèi)部的線程并不會自動停止
我們無法判斷隊列的長度,若隊列滿,那么put 操作會永遠(yuǎn)卡死在那里,任務(wù)都不會結(jié)束。
3.  改進(jìn)任務(wù)停止邏輯
最開始想到的,是通過在 multi_yield  函數(shù)參數(shù)中添加一個返回 bool 的函數(shù),這樣當(dāng)外部 break 時,同時將該函數(shù)的返回值置為 True ,內(nèi)部檢測到該標(biāo)志位后強制退出。偽代碼如下 :
_stop=False
def can_stop():
    return _stop
for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop):
    i+=1
    print(item)
    if i>10:
        _stop=True
        break
但這樣并不優(yōu)雅,引入了更多的函數(shù)作為參數(shù),還必須手工控制變量值,非常繁瑣。在多進(jìn)程模式下,stop 標(biāo)志位還如何解決?
我們希望外部在循環(huán)時執(zhí)行了break 后,會自動通知內(nèi)部的生成器。實現(xiàn)方法似乎就是 with 語句,即 contextmanager.
我們實現(xiàn)以下的包裝類:
class Yielder(object):
    def __init__(self, dispose):
        self.dispose = dispose
    def __enter__(self):
        pass
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.dispose()
它實現(xiàn)了with 的原語,參數(shù)是 dispose 函數(shù),作用是退出 with 代碼塊后的回收邏輯。
由于值類型的標(biāo)志位無法在多進(jìn)程環(huán)境中傳遞,我們再創(chuàng)建StopWrapper 類,用于管理停止標(biāo)志和回收資源 :
class Stop_Wrapper():
        def __init__(self):
            self.stop_flag = False
            self.workers=[]
        def is_stop(self):
            return self.stop_flag
        def stop(self):
            self.stop_flag = True
            for process in self.workers:
                if isinstance(process,multiprocessing.Process):
                    process.terminate()
最后的問題是,如何解決隊列滿或空時,put/get 的無限等待問題呢?考慮包裝一下 put/get :包裝在  while True  之中,每隔兩秒 get/put ,這樣即使阻塞時,也能保證可以檢查退出標(biāo)志位。所有線程在主線程結(jié)束后,最遲也能在 2s 內(nèi)自動退出。
def safe_queue_get(queue, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            data = queue.get(timeout=timeout)
            return data
        except:
            continue
def safe_queue_put(queue, item, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            queue.put(item, timeout=timeout)
            return item
        except:
            continue
如何使用呢?我們只需在multi_yield yield 語句之外加上一行就可以了:
with Yielder(stop_wrapper.stop):
        # create queue,boss,worker, then start all
        # ignore repeat code
        while not should_stop():
            data = safe_queue_get(result_queue, should_stop)
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data
仔細(xì)閱讀上面的代碼,  外部循環(huán)時退出循環(huán),則會自動觸發(fā)stop_wrapper stop 操作,回收全部資源,而不需通過外部的標(biāo)志位傳遞!這樣調(diào)用方在心智完全不需有額外的負(fù)擔(dān)。
實現(xiàn)生成器和上下文管理器的編程語言,都可以通過上述方式實現(xiàn)自動協(xié)程資源回收。筆者也實現(xiàn)了一個C# 版本的,有興趣歡迎交流。
這樣,我們就能像文章開頭那樣,實現(xiàn)并行的迭代器操作了。
4.  結(jié)語
一些實現(xiàn)的細(xì)節(jié)很有趣,我們借助在函數(shù)中定義函數(shù),可以不用復(fù)雜的類去承擔(dān)職責(zé),而僅僅只需函數(shù)。而類似的思想,在函數(shù)式編程中非常常見。
該工具已經(jīng)被筆者的流式語言 etlpy  所集成。但是依然有較多改進(jìn)的空間,如沒有集成分布式執(zhí)行模式。
來源:博客園

您還未登錄,請先登錄

熱門帖子

最新帖子

?