App下載

使用 Python 進(jìn)行實用的線程編程

花式作死冠軍 2021-09-13 14:13:25 瀏覽數(shù) (2692)
反饋

介紹

Python 不乏并發(fā)選項,標(biāo)準(zhǔn)庫包括對線程、進(jìn)程和異步 I/O 的支持。在許多情況下,Python 通過創(chuàng)建異步、線程和子進(jìn)程等高級模塊,消除了使用這些各種并發(fā)方法的困難。在標(biāo)準(zhǔn)庫之外,還有第三種解決方案,例如twisted、stackless 和處理模塊,僅舉幾例。本文使用實踐示例專門關(guān)注 Python 中的線程處理。網(wǎng)上有很多很好的資源來記錄線程 API,但本文試圖提供常見線程使用模式的實踐示例。

首先定義進(jìn)程和線程之間的區(qū)別很重要。線程與進(jìn)程的不同之處在于它們共享狀態(tài)、內(nèi)存和資源。這個簡單的區(qū)別對于線程來說既是優(yōu)點也是缺點。一方面,線程是輕量級的并且易于通信,但另一方面,它們帶來了一系列問題,包括死鎖、競爭條件和純粹的復(fù)雜性。幸運(yùn)的是,由于 GIL 和排隊模塊,Python 中的線程實現(xiàn)起來比其他語言要簡單得多。

你好 Python 線程

接下來,我假設(shè)你已經(jīng)安裝了 Python 2.5 或更高版本,因為許多示例將使用 Python 語言的更新功能,這些功能至少出現(xiàn)在 Python2.5 中。要開始使用 Python 中的線程,我們將從一個簡單的“Hello World”示例開始:

清單 1. hello_threads_example
        import threading
        import datetime
        
        class ThreadClass(threading.Thread):
          def run(self):
            now = datetime.datetime.now()
            print "%s says Hello World at time: %s" % 
            (self.getName(), now)
        
        for i in range(2):
          t = ThreadClass()
          t.start()

如果你運(yùn)行這個例子,你會得到以下輸出:

      #python hello_threads.py 
      Thread?1 says Hello World at time: 2008?05?13 13:22:50.252069
      Thread?2 says Hello World at time: 2008?05?13 13:22:50.252576

查看此輸出,你可以看到你收到了來自兩個帶有日期戳的線程的 Hello World 語句。如果你查看實際代碼,會發(fā)現(xiàn)有兩個 import 語句;一個導(dǎo)入 datetime 模塊,另一個導(dǎo)入 threading 模塊。該類ThreadClass繼承自threading.Thread,因此,您需要定義一個 run 方法來執(zhí)行您在線程內(nèi)運(yùn)行的代碼。在 run 方法中唯一需要注意的重要事項self.getName()是該方法將標(biāo)識線程的名稱。

最后三行代碼實際上調(diào)用了類并啟動了線程。如果您注意到,t.start()實際上是啟動線程的。線程模塊在設(shè)計時就考慮到了繼承性,實際上是建立在較低級別的線程模塊之上的。在大多數(shù)情況下,繼承自 被認(rèn)為是最佳實踐threading.Thread,因為它為線程編程創(chuàng)建了一個非常自然的 API。

使用帶線程的隊列

正如我之前提到的,當(dāng)線程需要共享數(shù)據(jù)或資源時,線程處理可能會很復(fù)雜。線程模塊確實提供了許多同步原語,包括信號量、條件變量、事件和鎖。雖然存在這些選項,但最好的做法是專注于使用隊列。隊列更容易處理,并使線程編程更加安全,因為它們有效地將所有對資源的訪問集中到單個線程,并允許更清晰、更易讀的設(shè)計模式。

在下一個示例中,你將首先創(chuàng)建一個程序,該程序?qū)⒁来位蛞粋€接一個地獲取網(wǎng)站的 URL,并打印出頁面的前 1024 個字節(jié)。這是使用線程可以更快地完成某些事情的經(jīng)典示例。首先,讓我們使用urllib2模塊一次抓取這些頁面,并對代碼進(jìn)行計時:

清單 2. URL 獲取序列
        import urllib2
        import time
        
        hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"        
        start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
        for host in hosts:
          url = urllib2.urlopen(host)
          print url.read(1024)
        
        print "Elapsed Time: %s" % (time.time() ? start)

當(dāng)你運(yùn)行它時,你會得到大量輸出到標(biāo)準(zhǔn)輸出,因為頁面被部分打印。但你會在最后得到這個:

        Elapsed Time: 2.40353488922

讓我們稍微看一下這段代碼。你只導(dǎo)入兩個模塊。首先,urllib2模塊是承擔(dān)重任并抓取網(wǎng)頁的東西。其次,你通過調(diào)用?time.time()?創(chuàng)建一個開始時間值,然后再次調(diào)用它并減去初始值以確定程序執(zhí)行所需的時間。最后,從程序的速度來看,“兩秒半”的結(jié)果并不可怕,但如果你有數(shù)百個網(wǎng)頁要檢索,考慮到當(dāng)前的平均值,大約需要 50 秒??纯磩?chuàng)建線程版本如何加快速度:

清單 3. URL 獲取線程
          #!/usr/bin/env python
          import Queue
          import threading
          import urllib2
          import time
          
          hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
          "http://ibm.com", "http://apple.com"          
          queue = Queue.Queue()
          
          class ThreadUrl(threading.Thread):
          """Threaded Url Grab"""
            def init(self, queue):
              threading.Thread.init(self)
              self.queue = queue
          
            def run(self):
              while True:
                #grabs host from queue
                host = self.queue.get()
            
                #grabs urls of hosts and prints first 1024 bytes of page
                url = urllib2.urlopen(host)
                print url.read(1024)
            
                #signals to queue job is done
                self.queue.task_done()
          
          start = time.time()
          def main():
          
            #spawn a pool of threads, and pass them queue instance 
            for i in range(5):
              t = ThreadUrl(queue)
              t.setDaemon(True)
              t.start()
              
           #populate queue with data   
              for host in hosts:
                queue.put(host)
           
           #wait on the queue until everything has been processed     
           queue.join()
          
          main()
          print "Elapsed Time: %s" % (time.time() ? start)

這個例子有更多的代碼需要解釋,但由于使用了排隊模塊,它并沒有比第一個線程示例復(fù)雜多少。這種模式是在 Python 中使用線程的一種非常常見且推薦的方式。步驟描述如下:

  1. 創(chuàng)建一個?Queue.Queue()?實例,然后用數(shù)據(jù)填充它。
  2. 將填充數(shù)據(jù)的實例傳遞到從?threading.Thread?繼承而創(chuàng)建的?Thread?類中。
  3. 產(chǎn)生一個守護(hù)線程池。
  4. 一次從隊列中拉出一項,并在線程內(nèi)部使用該數(shù)據(jù)(即 run 方法)來完成這項工作。
  5. 工作完成后,向?queue.task_done()?隊列發(fā)送任務(wù)已完成的信號。
  6. 加入隊列,這實際上意味著等到隊列為空,然后退出主程序。

關(guān)于此模式的注意事項:通過將守護(hù)線程設(shè)置為 true,它允許主線程或程序在只有守護(hù)線程處于活動狀態(tài)時退出。這創(chuàng)建了一種控制程序流程的簡單方法,因為你可以在退出之前加入隊列,或等到隊列為空。確切的過程在隊列模塊的文檔中得到了最好的描述,如右側(cè)的資源部分所示:

join()
阻塞,直到隊列中的所有項目都被獲取和處理。每當(dāng)將項目添加到隊列時,未完成任務(wù)的計數(shù)就會增加。每當(dāng)使用者線程調(diào)用 task_done() 以指示該項目已被檢索并且其上的所有工作已完成時,未完成任務(wù)的計數(shù)就會下降。當(dāng)未完成任務(wù)的數(shù)量降至零時, join()解鎖。

使用多個隊列

因為上面演示的模式非常有效,所以通過將額外的線程池與隊列鏈接來擴(kuò)展它是相對簡單的。在上面的示例中,你只是打印出網(wǎng)頁的第一部分。下一個示例返回每個線程抓取的整個網(wǎng)頁,然后將其放入另一個隊列。然后設(shè)置另一個加入第二個隊列的線程池,然后在網(wǎng)頁上工作。本示例中執(zhí)行的工作涉及使用名為 Beautiful Soup 的第三方 Python 模塊解析網(wǎng)頁。僅使用幾行代碼,使用此模塊,你將提取標(biāo)題標(biāo)簽并為你訪問的每個頁面打印出來。

清單 4. 多隊列數(shù)據(jù)挖掘網(wǎng)站
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"
queue = Queue.Queue()
outqueue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def init(self, queue, outqueue):
        threading.Thread.init(self)
        self.queue = queue
        self.outqueue = outqueue

    def run(self):
        while True:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init(self, out_queue):
        threading.Thread.__init(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()
def main():

    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() ? start)

如果你運(yùn)行此版本的腳本,你將獲得以下輸出:

  #python url_fetch_threaded_part2.py 

  <title>Google</title>  <title>Yahoo!</title>  <title>Apple</title>  <title>IBM United States</title>  <title>Amazon.com: Online Shopping for Electronics, Apparel,
 Computers, Books, DVDs & more</title>  Elapsed Time: 3.75387597084

在查看代碼時,你可以看到我們添加了另一個隊列實例,然后將該隊列傳遞給第一個線程池類ThreadURL. 接下來,你幾乎為下一個線程池類復(fù)制了完全相同的結(jié)構(gòu)DatamineThread。在這個類的run方法中,從每個線程的隊列中抓取網(wǎng)頁,chunk,然后用Beautiful Soup處理這個chunk。在這種情況下, 你可以使用 Beautiful Soup 來簡單地從每個頁面中提取標(biāo)題標(biāo)簽并打印出來。這個例子可以很容易地變成更有用的東西,因為你擁有基本搜索引擎或數(shù)據(jù)挖掘工具的核心。一個想法是使用 Beautiful Soup 從每個頁面中提取鏈接,然后關(guān)注它們。

總結(jié)

本文探討了 Python 中的線程,并展示了使用隊列來減輕復(fù)雜性和細(xì)微錯誤以及提高可讀代碼的最佳實踐。雖然這個基本模式相對簡單,但它可以通過將隊列和線程池鏈接在一起來解決大量問題。在最后一部分,您開始探索創(chuàng)建一個更復(fù)雜的處理管道,作為未來項目的模型。在資源部分有很多關(guān)于并發(fā)和線程的優(yōu)秀資源。

最后,重要的是要指出線程并不是所有問題的解決方案,而且進(jìn)程可以非常適合許多情況。如果你只需要分叉多個進(jìn)程并監(jiān)聽響應(yīng),那么標(biāo)準(zhǔn)庫 ??subprocess 模塊尤其可以更簡單地處理。


0 人點贊