如果你待處理的任務(wù)是(任一滿(mǎn)足一個(gè)條件)1、可拆分為多個(gè)子任務(wù),或任務(wù)是多個(gè)相同的任務(wù)的集合;2、任務(wù)不是CPU密集型的,如任務(wù)涉及到較多IO操作(如文件讀取和網(wǎng)絡(luò)數(shù)據(jù)處理)python學(xué)習(xí)
那么你使用多線(xiàn)程將任務(wù)并行運(yùn)行,能夠提高運(yùn)行效率。
假設(shè)待處理的任務(wù)為:有很多文件目錄,對(duì)于每個(gè)文件目錄,搜索匹配一個(gè)給定字符串的文件的所有行(相當(dāng)于是實(shí)現(xiàn)grep的功能)。 則此處子任務(wù)為:給定一個(gè)目錄,搜索匹配一個(gè)給定字符串的文件的所有行??偟娜蝿?wù)為處理所有目錄。
將子任務(wù)表示為一個(gè)函數(shù)T,如下所示:
def T(dir, pattern):
print('searching pattern %s in dir %s' % (pattern, dir))
...
為每個(gè)子任務(wù)創(chuàng)建一個(gè)線(xiàn)程
要實(shí)現(xiàn)并行化,最簡(jiǎn)單的方法是為每一個(gè)子任務(wù)創(chuàng)建一個(gè)thread,thread處理完后退出。
from threading import Threadfrom time import sleep
def T(dir, pattern):
"This is just a stub that simulate a dir operation"
sleep(1)
print('searching pattern %s in dir %s' % (pattern, dir))
threads = []dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'
for dir in dirs:
thread = Thread(target=T, args=(dir, pattern)) 1
thread.start() 2
threads.append(thread)
for thread in threads:
thread.join() 3
print('Main thread end here')
1 :創(chuàng)建一個(gè)Thread對(duì)象,target參數(shù)指定這個(gè)thread待執(zhí)行的函數(shù),args參數(shù)指定target函數(shù)的輸入?yún)?shù)
2 :?jiǎn)?dòng)這個(gè)thread。 T(dir, pattern)將被調(diào)用
3 :等待,直到這個(gè)thread結(jié)束。整個(gè)for循環(huán)表示主進(jìn)程會(huì)等待所有子線(xiàn)程結(jié)束后再退出
程序的運(yùn)行結(jié)果為:
searching pattern hello in dir a/b/csearching pattern hello in dir d/f
searching pattern hello in dir b/c
searching pattern hello in dir a/b/d
Main thread end here
可以看出由于線(xiàn)程是并行運(yùn)行的,部分輸出會(huì)交疊。但主進(jìn)程的打印總在最后。
以上例子中對(duì)于每個(gè)dir都需要?jiǎng)?chuàng)建一個(gè)thread。如果dir的數(shù)目較多,則會(huì)創(chuàng)建太多的thread,影響運(yùn)行效率。 較好的方式是限制總線(xiàn)程的數(shù)目。
限制線(xiàn)程數(shù)目
可以使用信號(hào)量(semaphore)來(lái)限制同時(shí)運(yùn)行的最大線(xiàn)程數(shù)目。如下所示:
from threading import Thread, BoundedSemaphorefrom time import sleep
def T(dir, pattern):
"This is just a stub that simulate a dir operation"
sleep(1)
print('searching pattern %s in dir %s' % (pattern, dir))
threads = []dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'
maxjobs = BoundedSemaphore(2) 1def wrapper(dir, pattern):
T(dir, pattern)
maxjobs.release() 2
for dir in dirs:
maxjobs.acquire() 3
thread = Thread(target=wrapper, args=(dir, pattern))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print('Main thread end here')
1 :創(chuàng)建一個(gè)有2個(gè)資源的信號(hào)量。一個(gè)信號(hào)量代表總的可用的資源數(shù)目,這里表示同時(shí)運(yùn)行的最大線(xiàn)程數(shù)目為2。
2 :在線(xiàn)程結(jié)束時(shí)釋放資源。運(yùn)行在子線(xiàn)程中。
3 :在啟動(dòng)一個(gè)線(xiàn)程前,先獲取一個(gè)資源。如果當(dāng)前已經(jīng)有2個(gè)線(xiàn)程在運(yùn)行,則會(huì)阻塞,直到其中一個(gè)線(xiàn)程結(jié)束。 運(yùn)行在主線(xiàn)程中。
當(dāng)限制了最大運(yùn)行線(xiàn)程數(shù)為2后,由于只有2個(gè)線(xiàn)程同時(shí)運(yùn)行,程序的輸出更加有序,幾乎總是為:
searching pattern hello in dir a/b/c
searching pattern hello in dir a/b/d
searching pattern hello in dir b/c
searching pattern hello in dir d/f
Main thread end here
以上實(shí)現(xiàn)中為每個(gè)子任務(wù)創(chuàng)建一個(gè)線(xiàn)程進(jìn)行處理,然后通過(guò)信號(hào)量限制同時(shí)運(yùn)行的線(xiàn)程的數(shù)目。如果子任務(wù)很多,這種方法會(huì)創(chuàng)建太多的線(xiàn)程。更好的方法 是使用線(xiàn)程池。
使用線(xiàn)程池(THREAD POOL)
即預(yù)先創(chuàng)建一定數(shù)目的線(xiàn)程,形成一個(gè)線(xiàn)程池。每個(gè)線(xiàn)程持續(xù)處理多個(gè)子任務(wù)(而不是處理一個(gè)就退出)。這樣做的好處是:創(chuàng)建的線(xiàn)程數(shù)目會(huì)比較固定。
那么,每個(gè)線(xiàn)程處理哪些子任務(wù)呢?一種方法為:預(yù)先將所有子任務(wù)均分給每個(gè)線(xiàn)程。如下所示:
from threading import Threadfrom time import sleep
def T(dir, pattern):
"This is just a stub that simulate a dir operation"
sleep(1)
print('searching pattern %s in dir %s' % (pattern, dir))
dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'
def wrapper(dirs, pattern): 1
for dir in dirs:
T(dir, pattern)
threadsPool = [ 2
Thread(target=wrapper, args=(dirs[0:2], pattern)),
Thread(target=wrapper, args=(dirs[2:], pattern)),]
for thread in threadsPool: 3
thread.start()
for thread in threadsPool:
thread.join()
print('Main thread end here')
1 :這個(gè)函數(shù)能夠處理多個(gè)dir,將作為線(xiàn)程的target函數(shù)
2 :創(chuàng)建一個(gè)有2個(gè)線(xiàn)程的線(xiàn)程池。并事先分配子任務(wù)給每個(gè)線(xiàn)程。線(xiàn)程1處理前兩個(gè)dir,線(xiàn)程2處理后兩個(gè)dir
3 :?jiǎn)?dòng)線(xiàn)程池中所有線(xiàn)程
程序的輸出結(jié)果為:
searching pattern hello in dir a/b/csearching pattern hello in dir b/c
searching pattern hello in dir d/f
searching pattern hello in dir a/b/d
Main thread end here
這種方法存在以下問(wèn)題:
子任務(wù)分配可能不均。導(dǎo)致每個(gè)線(xiàn)程運(yùn)行時(shí)間差別可能較大,則整體運(yùn)行時(shí)長(zhǎng)可能被拖長(zhǎng)
只能處理所有子任務(wù)都預(yù)先知道的情況,無(wú)法處理子任務(wù)實(shí)時(shí)出現(xiàn)的情況
如果有一種方法,能夠讓線(xiàn)程知道當(dāng)前所有的待處理子任務(wù),線(xiàn)程一旦空閑,便可以從中獲取一個(gè)任務(wù)進(jìn)行處理,則以上問(wèn)題都可以解決。任務(wù)隊(duì)列便是解決方案。
使用消息隊(duì)列
可以使用Queue實(shí)現(xiàn)一個(gè)任務(wù)隊(duì)列,用于在線(xiàn)程間傳遞子任務(wù)。主線(xiàn)程將所有待處理子任務(wù)放置在隊(duì)列中,子線(xiàn)程從隊(duì)列中獲取子任務(wù)去處理。 如下所有(注:以下代碼只運(yùn)行于Python 2,因?yàn)?/span>Queue只存在于Python 2) :
from threading import Threadfrom time import sleepimport Queue
def T(dir, pattern):
"This is just a stub that simulate a dir operation"
sleep(1)
print('searching pattern %s in dir %s' % (pattern, dir))
dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'
taskQueue = Queue.Queue() 1
def wrapper():
while True:
try:
dir = taskQueue.get(True, 0.1) 2
T(dir, pattern)
except Queue.Empty:
continue
threadsPool = [Thread(target=wrapper) for i in range(2)] 3
for thread in threadsPool:
thread.start() 4
for dir in dirs:
taskQueue.put(dir) 5
for thread in threadsPool:
thread.join()print('Main thread end here')
1 :創(chuàng)建一個(gè)任務(wù)隊(duì)列
2 :子線(xiàn)程從任務(wù)隊(duì)列中獲取一個(gè)任務(wù)。第一個(gè)參數(shù)為True,表示如果沒(méi)有任務(wù),會(huì)等待。第二個(gè)參數(shù)表示最長(zhǎng)等待0.1秒 如果在0.1秒后仍然沒(méi)有任務(wù),則會(huì)拋出一個(gè)Queue.Empty的異常
3 :創(chuàng)建有2個(gè)線(xiàn)程的線(xiàn)程池。注意target函數(shù)wrapper沒(méi)有任何參數(shù)
4 :?jiǎn)?dòng)所有線(xiàn)程
5 :主線(xiàn)程將所有子任務(wù)放置在任務(wù)隊(duì)列中,以供子線(xiàn)程獲取處理。由于子線(xiàn)程已經(jīng)被啟動(dòng),則子線(xiàn)程會(huì)立即獲取到任務(wù)并處理
程序的輸出為:
searching pattern hello in dir a/b/c
searching pattern hello in dir a/b/d
searching pattern hello in dir b/c
searching pattern hello in dir d/f
從中可以看出主進(jìn)程的打印結(jié)果并沒(méi)有出來(lái),程序會(huì)一直運(yùn)行,而不退出。這個(gè)問(wèn)題的原因是:目前的實(shí)現(xiàn)中,子線(xiàn)程為一個(gè)無(wú)限循環(huán), 因此其永遠(yuǎn)不會(huì)終止。因此,必須有一種機(jī)制來(lái)結(jié)束子進(jìn)程。
終止子進(jìn)程
一種簡(jiǎn)單方法為,可以在任務(wù)隊(duì)列中放置一個(gè)特殊元素,作為終止符。當(dāng)子線(xiàn)程從任務(wù)隊(duì)列中獲取這個(gè)終止符后,便自行退出。如下所示,使用None作為終止符。
from threading import Threadfrom time import sleepimport Queue
def T(dir, pattern):
"This is just a stub that simulate a dir operation"
sleep(1)
print('searching pattern %s in dir %s' % (pattern, dir))
dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'
taskQueue = Queue.Queue()
def wrapper():
while True:
try:
dir = taskQueue.get(True, 0.1)
if dir is None: 1
taskQueue.put(dir) 2
break
T(dir, pattern)
except Queue.Empty:
continue
threadsPool = [Thread(target=wrapper) for i in range(2)]
for thread in threadsPool:
thread.start()
for dir in dirs:
taskQueue.put(dir)
taskQueue.put(None) 3
for thread in threadsPool:
thread.join()print('Main thread end here')
1 :如果任務(wù)為終止符(此處為None),則退出
2 :將這個(gè)終止符重新放回任務(wù)隊(duì)列。因?yàn)橹挥幸粋€(gè)終止符,如果不放回,則其它子線(xiàn)程獲取不到,也就無(wú)法終止
3 :將終止符放在任務(wù)隊(duì)列。注意必須放置在末尾,否則終止符后的任務(wù)無(wú)法得到處理
修改過(guò)后,程序能夠正常運(yùn)行,主進(jìn)程能夠正常退出了。
searching pattern hello in dir a/b/csearching pattern hello in dir a/b/d
searching pattern hello in dir b/c
searching pattern hello in dir d/f
Main thread end here
總結(jié)
要并行化處理子任務(wù),最簡(jiǎn)單的方法是為每個(gè)子任務(wù)創(chuàng)建一個(gè)線(xiàn)程去處理。這種方法的缺點(diǎn)是:如果子任務(wù)非常多,則需要?jiǎng)?chuàng)建的線(xiàn)程數(shù)目會(huì)非常多。 并且同時(shí)運(yùn)行的線(xiàn)程數(shù)目也會(huì)較多。通過(guò)使用信號(hào)量來(lái)限制同時(shí)運(yùn)行的線(xiàn)程數(shù)目,通過(guò)線(xiàn)程池來(lái)避免創(chuàng)建過(guò)多的線(xiàn)程。
與每個(gè)線(xiàn)程處理一個(gè)任務(wù)不同,線(xiàn)程池中每個(gè)線(xiàn)程會(huì)處理多個(gè)子任務(wù)。這帶來(lái)一個(gè)問(wèn)題:每個(gè)子線(xiàn)程如何知道要處理哪些子任務(wù)。 一種方法是預(yù)先將所有子任務(wù)均分給每個(gè)線(xiàn)程,而更靈活的方法則是通過(guò)任務(wù)隊(duì)列,由子線(xiàn)程自行決定要處理哪些任務(wù)。
使用線(xiàn)程池時(shí),線(xiàn)程主函數(shù)通常實(shí)現(xiàn)為一個(gè)無(wú)限循環(huán),因此需要考慮如何終止線(xiàn)程??梢栽谌蝿?wù)隊(duì)列中放置一個(gè)終止符來(lái)告訴線(xiàn)程沒(méi)有更多任務(wù), 因此其可以終止。