歡迎加入QQ討論群258996829
麥子學(xué)院 頭像
蘋(píng)果6袋
6
麥子學(xué)院

什么樣的任務(wù)可以用線(xiàn)程將任務(wù)并行化呢?

發(fā)布時(shí)間:2018-05-15 20:56  回復(fù):0  查看:2925   最后回復(fù):2018-05-15 20:56  

如果待處理的任務(wù)是(任一滿(mǎn)足一個(gè)條件)1、可拆分為多個(gè)子任務(wù),或任務(wù)是多個(gè)相同的任務(wù)的集合;2、任務(wù)不是CPU密集型的,如任務(wù)涉及到較多IO操作(如文件讀取和網(wǎng)絡(luò)數(shù)據(jù)處理python學(xué)習(xí)

那么你使用多線(xiàn)程將任務(wù)并行運(yùn)行,能夠提高運(yùn)行效率。

假設(shè)待處理的任務(wù)為:有很多文件目錄,對(duì)于每個(gè)文件目錄,搜索匹配一個(gè)給定字符串的文件的所有行(相當(dāng)于是實(shí)現(xiàn)grep的功能)。 則此處子任務(wù)為:給定一個(gè)目錄,搜索匹配一個(gè)給定字符串的文件的所有行??偟娜蝿?wù)為處理所有目錄。

將子任務(wù)表示為一個(gè)函數(shù)T,如下所示:

def T(dir, pattern):

  print('searching pattern %s in dir %s' % (pattern, dir))

  ...

為每個(gè)子任務(wù)創(chuàng)建一個(gè)線(xiàn)程

要實(shí)現(xiàn)并行化,最簡(jiǎn)單的方法是為每一個(gè)子任務(wù)創(chuàng)建一個(gè)thread,thread處理完后退出。

from threading import Threadfrom time import sleep

def T(dir, pattern):

  "This is just a stub that simulate a dir operation"

  sleep(1)

  print('searching pattern %s in dir %s' % (pattern, dir))

threads = []dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'

for dir in dirs:

  thread = Thread(target=T, args=(dir, pattern))   1

  thread.start()   2

  threads.append(thread)

for thread in threads:

  thread.join()   3

print('Main thread end here')

:創(chuàng)建一個(gè)Thread對(duì)象,target參數(shù)指定這個(gè)thread待執(zhí)行的函數(shù),args參數(shù)指定target函數(shù)的輸入?yún)?shù)

:?jiǎn)?dòng)這個(gè)thread。 T(dir, pattern)將被調(diào)用

:等待,直到這個(gè)thread結(jié)束。整個(gè)for循環(huán)表示主進(jìn)程會(huì)等待所有子線(xiàn)程結(jié)束后再退出

程序的運(yùn)行結(jié)果為:

searching pattern hello in dir a/b/csearching pattern hello in dir d/f

searching pattern hello in dir b/c

 searching pattern hello in dir a/b/d

 

Main thread end here

可以看出由于線(xiàn)程是并行運(yùn)行的,部分輸出會(huì)交疊。但主進(jìn)程的打印總在最后。

以上例子中對(duì)于每個(gè)dir都需要?jiǎng)?chuàng)建一個(gè)thread。如果dir的數(shù)目較多,則會(huì)創(chuàng)建太多的thread,影響運(yùn)行效率。 較好的方式是限制總線(xiàn)程的數(shù)目。

限制線(xiàn)程數(shù)目

可以使用信號(hào)量(semaphore)來(lái)限制同時(shí)運(yùn)行的最大線(xiàn)程數(shù)目。如下所示:

from threading import Thread, BoundedSemaphorefrom time import sleep

def T(dir, pattern):

  "This is just a stub that simulate a dir operation"

  sleep(1)

  print('searching pattern %s in dir %s' % (pattern, dir))

threads = []dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'

maxjobs = BoundedSemaphore(2)   1def wrapper(dir, pattern):

  T(dir, pattern)

  maxjobs.release()   2

for dir in dirs:

  maxjobs.acquire()   3

  thread = Thread(target=wrapper, args=(dir, pattern))

  thread.start()

  threads.append(thread)

for thread in threads:

  thread.join()

print('Main thread end here')

:創(chuàng)建一個(gè)有2個(gè)資源的信號(hào)量。一個(gè)信號(hào)量代表總的可用的資源數(shù)目,這里表示同時(shí)運(yùn)行的最大線(xiàn)程數(shù)目為2。

:在線(xiàn)程結(jié)束時(shí)釋放資源。運(yùn)行在子線(xiàn)程中。

:在啟動(dòng)一個(gè)線(xiàn)程前,先獲取一個(gè)資源。如果當(dāng)前已經(jīng)有2個(gè)線(xiàn)程在運(yùn)行,則會(huì)阻塞,直到其中一個(gè)線(xiàn)程結(jié)束。 運(yùn)行在主線(xiàn)程中。

當(dāng)限制了最大運(yùn)行線(xiàn)程數(shù)為2后,由于只有2個(gè)線(xiàn)程同時(shí)運(yùn)行,程序的輸出更加有序,幾乎總是為:

searching pattern hello in dir a/b/c

searching pattern hello in dir a/b/d

searching pattern hello in dir b/c

searching pattern hello in dir d/f

Main thread end here

以上實(shí)現(xiàn)中為每個(gè)子任務(wù)創(chuàng)建一個(gè)線(xiàn)程進(jìn)行處理,然后通過(guò)信號(hào)量限制同時(shí)運(yùn)行的線(xiàn)程的數(shù)目。如果子任務(wù)很多,這種方法會(huì)創(chuàng)建太多的線(xiàn)程。更好的方法 是使用線(xiàn)程池。

使用線(xiàn)程池(THREAD POOL

即預(yù)先創(chuàng)建一定數(shù)目的線(xiàn)程,形成一個(gè)線(xiàn)程池。每個(gè)線(xiàn)程持續(xù)處理多個(gè)子任務(wù)(而不是處理一個(gè)就退出)。這樣做的好處是:創(chuàng)建的線(xiàn)程數(shù)目會(huì)比較固定。

那么,每個(gè)線(xiàn)程處理哪些子任務(wù)呢?一種方法為:預(yù)先將所有子任務(wù)均分給每個(gè)線(xiàn)程。如下所示:

from threading import Threadfrom time import sleep

def T(dir, pattern):

  "This is just a stub that simulate a dir operation"

  sleep(1)

  print('searching pattern %s in dir %s' % (pattern, dir))

dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'

def wrapper(dirs, pattern):   1

  for dir in dirs:

    T(dir, pattern)

threadsPool = [   2

  Thread(target=wrapper, args=(dirs[0:2], pattern)),

  Thread(target=wrapper, args=(dirs[2:], pattern)),]

for thread in threadsPool:   3

  thread.start()

for thread in threadsPool:

  thread.join()

print('Main thread end here')

:這個(gè)函數(shù)能夠處理多個(gè)dir,將作為線(xiàn)程的target函數(shù)

:創(chuàng)建一個(gè)有2個(gè)線(xiàn)程的線(xiàn)程池。并事先分配子任務(wù)給每個(gè)線(xiàn)程。線(xiàn)程1處理前兩個(gè)dir,線(xiàn)程2處理后兩個(gè)dir

:?jiǎn)?dòng)線(xiàn)程池中所有線(xiàn)程

程序的輸出結(jié)果為:

searching pattern hello in dir a/b/csearching pattern hello in dir b/c

 

searching pattern hello in dir d/f

 searching pattern hello in dir a/b/d

Main thread end here

這種方法存在以下問(wèn)題:

子任務(wù)分配可能不均。導(dǎo)致每個(gè)線(xiàn)程運(yùn)行時(shí)間差別可能較大,則整體運(yùn)行時(shí)長(zhǎng)可能被拖長(zhǎng)

只能處理所有子任務(wù)都預(yù)先知道的情況,無(wú)法處理子任務(wù)實(shí)時(shí)出現(xiàn)的情況

如果有一種方法,能夠讓線(xiàn)程知道當(dāng)前所有的待處理子任務(wù),線(xiàn)程一旦空閑,便可以從中獲取一個(gè)任務(wù)進(jìn)行處理,則以上問(wèn)題都可以解決。任務(wù)隊(duì)列便是解決方案。

使用消息隊(duì)列

可以使用Queue實(shí)現(xiàn)一個(gè)任務(wù)隊(duì)列,用于在線(xiàn)程間傳遞子任務(wù)。主線(xiàn)程將所有待處理子任務(wù)放置在隊(duì)列中,子線(xiàn)程從隊(duì)列中獲取子任務(wù)去處理。 如下所有(注:以下代碼只運(yùn)行于Python 2,因?yàn)?/span>Queue只存在于Python 2) :

from threading import Threadfrom time import sleepimport Queue

def T(dir, pattern):

  "This is just a stub that simulate a dir operation"

  sleep(1)

  print('searching pattern %s in dir %s' % (pattern, dir))

dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'

taskQueue = Queue.Queue()   1

def wrapper():

  while True:

    try:

      dir = taskQueue.get(True, 0.1)   2

      T(dir, pattern)

    except Queue.Empty:

continue

threadsPool = [Thread(target=wrapper) for i in range(2)]   3

for thread in threadsPool:

  thread.start()    4

for dir in dirs:

  taskQueue.put(dir)   5

for thread in threadsPool:

  thread.join()print('Main thread end here')

:創(chuàng)建一個(gè)任務(wù)隊(duì)列

:子線(xiàn)程從任務(wù)隊(duì)列中獲取一個(gè)任務(wù)。第一個(gè)參數(shù)為True,表示如果沒(méi)有任務(wù),會(huì)等待。第二個(gè)參數(shù)表示最長(zhǎng)等待0.1秒 如果在0.1秒后仍然沒(méi)有任務(wù),則會(huì)拋出一個(gè)Queue.Empty的異常

:創(chuàng)建有2個(gè)線(xiàn)程的線(xiàn)程池。注意target函數(shù)wrapper沒(méi)有任何參數(shù)

:?jiǎn)?dòng)所有線(xiàn)程

:主線(xiàn)程將所有子任務(wù)放置在任務(wù)隊(duì)列中,以供子線(xiàn)程獲取處理。由于子線(xiàn)程已經(jīng)被啟動(dòng),則子線(xiàn)程會(huì)立即獲取到任務(wù)并處理

程序的輸出為:

searching pattern hello in dir a/b/c

searching pattern hello in dir a/b/d

searching pattern hello in dir b/c

 searching pattern hello in dir d/f

從中可以看出主進(jìn)程的打印結(jié)果并沒(méi)有出來(lái),程序會(huì)一直運(yùn)行,而不退出。這個(gè)問(wèn)題的原因是:目前的實(shí)現(xiàn)中,子線(xiàn)程為一個(gè)無(wú)限循環(huán), 因此其永遠(yuǎn)不會(huì)終止。因此,必須有一種機(jī)制來(lái)結(jié)束子進(jìn)程。

終止子進(jìn)程

一種簡(jiǎn)單方法為,可以在任務(wù)隊(duì)列中放置一個(gè)特殊元素,作為終止符。當(dāng)子線(xiàn)程從任務(wù)隊(duì)列中獲取這個(gè)終止符后,便自行退出。如下所示,使用None作為終止符。

from threading import Threadfrom time import sleepimport Queue

def T(dir, pattern):

  "This is just a stub that simulate a dir operation"

  sleep(1)

  print('searching pattern %s in dir %s' % (pattern, dir))

dirs = ['a/b/c', 'a/b/d', 'b/c', 'd/f']pattern = 'hello'

taskQueue = Queue.Queue()

def wrapper():

  while True:

    try:

      dir = taskQueue.get(True, 0.1)

      if dir is None:   1

taskQueue.put(dir)   2

break

 

      T(dir, pattern)

    except Queue.Empty:

continue

threadsPool = [Thread(target=wrapper) for i in range(2)]

for thread in threadsPool:

  thread.start()

for dir in dirs:

  taskQueue.put(dir)

taskQueue.put(None)   3

for thread in threadsPool:

  thread.join()print('Main thread end here')

:如果任務(wù)為終止符(此處為None),則退出

:將這個(gè)終止符重新放回任務(wù)隊(duì)列。因?yàn)橹挥幸粋€(gè)終止符,如果不放回,則其它子線(xiàn)程獲取不到,也就無(wú)法終止

:將終止符放在任務(wù)隊(duì)列。注意必須放置在末尾,否則終止符后的任務(wù)無(wú)法得到處理

修改過(guò)后,程序能夠正常運(yùn)行,主進(jìn)程能夠正常退出了。

searching pattern hello in dir a/b/csearching pattern hello in dir a/b/d

 

searching pattern hello in dir b/c

 searching pattern hello in dir d/f

Main thread end here

總結(jié)

要并行化處理子任務(wù),最簡(jiǎn)單的方法是為每個(gè)子任務(wù)創(chuàng)建一個(gè)線(xiàn)程去處理。這種方法的缺點(diǎn)是:如果子任務(wù)非常多,則需要?jiǎng)?chuàng)建的線(xiàn)程數(shù)目會(huì)非常多。 并且同時(shí)運(yùn)行的線(xiàn)程數(shù)目也會(huì)較多。通過(guò)使用信號(hào)量來(lái)限制同時(shí)運(yùn)行的線(xiàn)程數(shù)目,通過(guò)線(xiàn)程池來(lái)避免創(chuàng)建過(guò)多的線(xiàn)程。

與每個(gè)線(xiàn)程處理一個(gè)任務(wù)不同,線(xiàn)程池中每個(gè)線(xiàn)程會(huì)處理多個(gè)子任務(wù)。這帶來(lái)一個(gè)問(wèn)題:每個(gè)子線(xiàn)程如何知道要處理哪些子任務(wù)。 一種方法是預(yù)先將所有子任務(wù)均分給每個(gè)線(xiàn)程,而更靈活的方法則是通過(guò)任務(wù)隊(duì)列,由子線(xiàn)程自行決定要處理哪些任務(wù)。

使用線(xiàn)程池時(shí),線(xiàn)程主函數(shù)通常實(shí)現(xiàn)為一個(gè)無(wú)限循環(huán),因此需要考慮如何終止線(xiàn)程??梢栽谌蝿?wù)隊(duì)列中放置一個(gè)終止符來(lái)告訴線(xiàn)程沒(méi)有更多任務(wù), 因此其可以終止。

您還未登錄,請(qǐng)先登錄

熱門(mén)帖子

最新帖子

?