Python網(wǎng)絡(luò)編程之線(xiàn)程與進(jìn)程

2018-06-08 17:33 更新

What is a Thread?

線(xiàn)程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位,它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位,一條線(xiàn)程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線(xiàn)程,每條線(xiàn)程并行執(zhí)行不同的任務(wù)。

在同一個(gè)進(jìn)程內(nèi)的線(xiàn)程的數(shù)據(jù)是可以進(jìn)行互相訪(fǎng)問(wèn)的。

線(xiàn)程的切換使用過(guò)上下文來(lái)實(shí)現(xiàn)的,比如有一本書(shū),有a和b這兩個(gè)人(兩個(gè)線(xiàn)程)看,a看完之后記錄當(dāng)前看到那一頁(yè)哪一行,然后交給b看,b看完之后記錄當(dāng)前看到了那一頁(yè)哪一行,此時(shí)a又要看了,那么a就通過(guò)上次記錄的值(上下文)直接找到上次看到了哪里,然后繼續(xù)往下看。



What is a Process?

一個(gè)進(jìn)程至少要包含一個(gè)線(xiàn)程,每個(gè)進(jìn)程在啟動(dòng)的時(shí)候就會(huì)自動(dòng)的啟動(dòng)一個(gè)線(xiàn)程,進(jìn)程里面的第一個(gè)線(xiàn)程就是主線(xiàn)程,每次在進(jìn)程內(nèi)創(chuàng)建的子線(xiàn)程都是由主線(xiàn)程進(jìn)程創(chuàng)建和銷(xiāo)毀,子線(xiàn)程也可以由主線(xiàn)程創(chuàng)建出來(lái)的線(xiàn)程創(chuàng)建和銷(xiāo)毀線(xiàn)程。

進(jìn)程是對(duì)各種資源管理的集合,比如要調(diào)用內(nèi)存、CPU、網(wǎng)卡、聲卡等,進(jìn)程要操作上述的硬件之前都必須要?jiǎng)?chuàng)建一個(gè)線(xiàn)程,進(jìn)程里面可以包含多個(gè)線(xiàn)程,QQ就是一個(gè)進(jìn)程。

繼續(xù)拿QQ來(lái)說(shuō),比如我現(xiàn)在打卡了QQ的聊天窗口、個(gè)人信息窗口、設(shè)置窗口等,那么每一個(gè)打開(kāi)的窗口都是一個(gè)線(xiàn)程,他們都在執(zhí)行不同的任務(wù),比如聊天窗口這個(gè)線(xiàn)程可以和好友進(jìn)行互動(dòng),聊天,視頻等,個(gè)人信息窗口我可以查看、修改自己的資料。

為了進(jìn)程安全起見(jiàn),所以?xún)蓚€(gè)進(jìn)程之間的數(shù)據(jù)是不能夠互相訪(fǎng)問(wèn)的(默認(rèn)情況下),比如自己寫(xiě)了一個(gè)應(yīng)用程序,然后讓別人運(yùn)行起來(lái),那么我的這個(gè)程序就可以訪(fǎng)問(wèn)用戶(hù)啟動(dòng)的其他應(yīng)用,我可以通過(guò)我自己的程序去訪(fǎng)問(wèn)QQ,然后拿到一些聊天記錄等比較隱秘的信息,那么這個(gè)時(shí)候就不安全了,所以說(shuō)進(jìn)程與進(jìn)程之間的數(shù)據(jù)是不可以互相訪(fǎng)問(wèn)的,而且每一個(gè)進(jìn)程的內(nèi)存是獨(dú)立的。


進(jìn)程與線(xiàn)程的區(qū)別?

  1. 線(xiàn)程是執(zhí)行的指令集,進(jìn)程是資源的集合

  2. 線(xiàn)程的啟動(dòng)速度要比進(jìn)程的啟動(dòng)速度要快

  3. 兩個(gè)線(xiàn)程的執(zhí)行速度是一樣的

  4. 進(jìn)程與線(xiàn)程的運(yùn)行速度是沒(méi)有可比性的

  5. 線(xiàn)程共享創(chuàng)建它的進(jìn)程的內(nèi)存空間,進(jìn)程的內(nèi)存是獨(dú)立的。

  6. 兩個(gè)線(xiàn)程共享的數(shù)據(jù)都是同一份數(shù)據(jù),兩個(gè)子進(jìn)程的數(shù)據(jù)不是共享的,而且數(shù)據(jù)是獨(dú)立的;

  7. 同一個(gè)進(jìn)程的線(xiàn)程之間可以直接交流,同一個(gè)主進(jìn)程的多個(gè)子進(jìn)程之間是不可以進(jìn)行交流,如果兩個(gè)進(jìn)程之間需要通信,就必須要通過(guò)一個(gè)中間代理來(lái)實(shí)現(xiàn);

  8. 一個(gè)新的線(xiàn)程很容易被創(chuàng)建,一個(gè)新的進(jìn)程創(chuàng)建需要對(duì)父進(jìn)程進(jìn)行一次克隆

  9. 一個(gè)線(xiàn)程可以控制和操作同一個(gè)進(jìn)程里的其他線(xiàn)程,線(xiàn)程與線(xiàn)程之間沒(méi)有隸屬關(guān)系,但是進(jìn)程只能操作子進(jìn)程

  10. 改變主線(xiàn)程,有可能會(huì)影響到其他線(xiàn)程的行為,但是對(duì)于父進(jìn)程的修改是不會(huì)影響子進(jìn)程;

一個(gè)多并發(fā)的小腳本

  1. import threading

  2. import time

  3. def Princ(String):

  4.    print('task', String)

  5.    time.sleep(5)

  6. # target=目標(biāo)函數(shù), args=傳入的參數(shù)

  7. t1 = threading.Thread(target=Princ, args=('t1',))

  8. t1.start()

  9. t2 = threading.Thread(target=Princ, args=('t1',))

  10. t2.start()

  11. t3 = threading.Thread(target=Princ, args=('t1',))

  12. t3.start()

參考文檔

進(jìn)程與線(xiàn)程的一個(gè)簡(jiǎn)單解釋
http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
Linux進(jìn)程與線(xiàn)程的區(qū)別
https://my.oschina.net/cnyinlinux/blog/422207

線(xiàn)程

Thread module emulating a subset of Java’s threading model.

調(diào)用threading模塊調(diào)用線(xiàn)程的兩種方式

直接調(diào)用

  1. import threading

  2. import time

  3. def Princ(String):

  4.    print('task', String)

  5.    time.sleep(5)

  6. # target=目標(biāo)函數(shù), args=傳入的參數(shù)

  7. t1 = threading.Thread(target=Princ, args=('t1',))

  8. t1.start()

  9. t2 = threading.Thread(target=Princ, args=('t1',))

  10. t2.start()

  11. t3 = threading.Thread(target=Princ, args=('t1',))

  12. t3.start()

通過(guò)類(lèi)調(diào)用

  1. import threading

  2. import time

  3. class MyThreading(threading.Thread):

  4.    def __init__(self, conn):

  5.        super(MyThreading, self).__init__()

  6.        self.conn = conn

  7.    def run(self):

  8.        print('run task', self.conn)

  9.        time.sleep(5)

  10. t1 = MyThreading('t1')

  11. t2 = MyThreading('t2')

  12. t1.start()

  13. t2.start()

多線(xiàn)程

多線(xiàn)程在Python內(nèi)實(shí)則就是一個(gè)假象,為什么這么說(shuō)呢,因?yàn)镃PU的處理速度是很快的,所以我們看起來(lái)以一個(gè)線(xiàn)程在執(zhí)行多個(gè)任務(wù),每個(gè)任務(wù)的執(zhí)行速度是非常之快的,利用上下文切換來(lái)快速的切換任務(wù),以至于我們根本感覺(jué)不到。

但是頻繁的使用上下文切換也是要耗費(fèi)一定的資源,因?yàn)閱尉€(xiàn)程在每次切換任務(wù)的時(shí)候需要保存當(dāng)前任務(wù)的上下文。

什么時(shí)候用到多線(xiàn)程?

首先IO操作是不占用CPU的,只有計(jì)算的時(shí)候才會(huì)占用CPU(譬如1+1=2),Python中的多線(xiàn)程不適合CPU密集型的任務(wù),適合IO密集型的任務(wù)(sockt server)。

啟動(dòng)多個(gè)線(xiàn)程

主進(jìn)程在啟動(dòng)之后會(huì)啟動(dòng)一個(gè)主線(xiàn)程,下面的腳本中讓主線(xiàn)程啟動(dòng)了多個(gè)子線(xiàn)程,然而啟動(dòng)的子線(xiàn)程是獨(dú)立的,所以主線(xiàn)程不會(huì)等待子線(xiàn)程執(zhí)行完畢,而是主線(xiàn)程繼續(xù)往下執(zhí)行,并行執(zhí)行。

  1. for i in range(50):

  2.    t = threading.Thread(target=Princ, args=('t-%s' % (i),))

  3.    t.start()

join()

join()方法可以讓程序等待每一個(gè)線(xiàn)程之后完成之后再往下執(zhí)行,又成為串行執(zhí)行。

  1. import threading

  2. import time

  3. def Princ(String):

  4.    print('task', String)

  5.    time.sleep(1)

  6. for i in range(50):

  7.    t = threading.Thread(target=Princ, args=('t-%s' % (i),))

  8.    t.start()

  9.    # 當(dāng)前線(xiàn)程執(zhí)行完畢之后在執(zhí)行后面的線(xiàn)程

  10.    t.join()

讓主線(xiàn)程阻塞,子現(xiàn)在并行執(zhí)行

  1. import threading

  2. import time

  3. def Princ(String):

  4.    print('task', String)

  5.    time.sleep(2)

  6. # 執(zhí)行子線(xiàn)程的時(shí)間

  7. start_time = time.time()

  8. # 存放線(xiàn)程的實(shí)例

  9. t_objs = []

  10. for i in range(50):

  11.    t = threading.Thread(target=Princ, args=('t-%s' % (i),))

  12.    t.start()

  13.    # 為了不讓后面的子線(xiàn)程阻塞,把當(dāng)前的子線(xiàn)程放入到一個(gè)列表中

  14.    t_objs.append(t)

  15. # 循環(huán)所有子線(xiàn)程實(shí)例,等待所有子線(xiàn)程執(zhí)行完畢

  16. for t in t_objs:

  17.    t.join()

  18. # 當(dāng)前時(shí)間減去開(kāi)始時(shí)間就等于執(zhí)行的過(guò)程中需要的時(shí)間

  19. print(time.time() - start_time)

查看主線(xiàn)程與子線(xiàn)程

  1. import threading

  2. class MyThreading(threading.Thread):

  3.    def __init__(self):

  4.        super(MyThreading, self).__init__()

  5.    def run(self):

  6.        print('我是子線(xiàn)程: ', threading.current_thread())

  7. t = MyThreading()

  8. t.start()

  9. print('我是主線(xiàn)程: ', threading.current_thread())

輸出如下:

  1. C:\Python\Python35\python.exe E:/MyCodeProjects/進(jìn)程與線(xiàn)程/s3.py

  2. 我是子線(xiàn)程:  <MyThreading(Thread-1, started 7724)>

  3. 我是主線(xiàn)程:  <_MainThread(MainThread, started 3680)>

  4. Process finished with exit code 0

查看當(dāng)前進(jìn)程的活動(dòng)線(xiàn)程個(gè)數(shù)

  1. import threading

  2. class MyThreading(threading.Thread):

  3.    def __init__(self):

  4.        super(MyThreading, self).__init__()

  5.    def run(self):

  6.        print('www.anshengme.com')

  7. t = MyThreading()

  8. t.start()

  9. print('線(xiàn)程個(gè)數(shù): ', threading.active_count())

輸出如下:

  1. C:\Python\Python35\python.exe E:/MyCodeProjects/進(jìn)程與線(xiàn)程/s3.py

  2. www.anshengme.com

  3. # 一個(gè)主線(xiàn)程和一個(gè)子線(xiàn)程

  4. 線(xiàn)程個(gè)數(shù):  2

  5. Process finished with exit code 0

Event

Event是線(xiàn)程間通信最間的機(jī)制之一:一個(gè)線(xiàn)程發(fā)送一個(gè)event信號(hào),其他的線(xiàn)程則等待這個(gè)信號(hào)。用于主線(xiàn)程控制其他線(xiàn)程的執(zhí)行。 Events 管理一個(gè)flag,這個(gè)flag可以使用set
()設(shè)置成True或者使用clear()重置為False,wait()則用于阻塞,在flag為T(mén)rue之前。flag默認(rèn)為False。

選項(xiàng)描述
Event.wait([timeout])堵塞線(xiàn)程,直到Event對(duì)象內(nèi)部標(biāo)識(shí)位被設(shè)為T(mén)rue或超時(shí)(如果提供了參數(shù)timeout)
Event.set()將標(biāo)識(shí)位設(shè)為T(mén)ure
Event.clear()將標(biāo)識(shí)伴設(shè)為False
Event.isSet()判斷標(biāo)識(shí)位是否為T(mén)ure
  1. #!/use/bin/env python

  2. # _*_ coding: utf-8- _*_

  3. import threading

  4. def runthreading(event):

  5.    print("Start...")

  6.    event.wait()

  7.    print("End...")

  8. event_obj = threading.Event()

  9. for n in range(10):

  10.    t = threading.Thread(target=runthreading, args=(event_obj,))

  11.    t.start()

  12. event_obj.clear()

  13. inp = input("True/False?>> ")

  14. if inp == "True":

  15.    event_obj.set()

  16. `

守護(hù)進(jìn)程(守護(hù)線(xiàn)程)

一個(gè)主進(jìn)程可以啟動(dòng)多個(gè)守護(hù)進(jìn)程,但是主進(jìn)程必須要一直運(yùn)行,如果主進(jìn)程掛掉了,那么守護(hù)進(jìn)程也會(huì)隨之掛掉

程序會(huì)等待主線(xiàn)程(進(jìn)程)執(zhí)行完畢,但是不會(huì)等待守護(hù)進(jìn)程(線(xiàn)程)

  1. import threading

  2. import time

  3. def Princ(String):

  4.    print('task', String)

  5.    time.sleep(2)

  6. for i in range(50):

  7.    t = threading.Thread(target=Princ, args=('t-%s' % (i),))

  8.    t.setDaemon(True)  # 把當(dāng)前線(xiàn)程設(shè)置為守護(hù)線(xiàn)程,要在start之前設(shè)置

  9.    t.start()

場(chǎng)景預(yù)設(shè): 比如現(xiàn)在有一個(gè)FTP服務(wù),每一個(gè)用戶(hù)連接上去的時(shí)候都會(huì)創(chuàng)建一個(gè)守護(hù)線(xiàn)程,現(xiàn)在已經(jīng)有300個(gè)用戶(hù)連接上去了,就是說(shuō)已經(jīng)創(chuàng)建了300個(gè)守護(hù)線(xiàn)程,但是突然之間FTP服務(wù)宕掉了,這個(gè)時(shí)候就不會(huì)等待守護(hù)線(xiàn)程執(zhí)行完畢再退出,而是直接退出,如果是普通的線(xiàn)程,那么就會(huì)登臺(tái)線(xiàn)程執(zhí)行完畢再退出。

  1. #!/use/bin/env python

  2. # _*_ coding:utf-8 _*_

  3. from multiprocessing import Process

  4. import time

  5. def runprocess(arg):

  6.    print(arg)

  7.    time.sleep(2)

  8. p = Process(target=runprocess, args=(11,))

  9. p.daemon=True

  10. p.start()

  11. print("end")

線(xiàn)程之間的數(shù)據(jù)交互與鎖(互斥鎖)

python2.x需要加鎖,但是在python3.x上面就不需要了

  1. # _*_ coding:utf-8 _*_

  2. import threading

  3. def Princ():

  4.    # 獲取鎖

  5.    lock.acquire()

  6.    # 在函數(shù)內(nèi)可以直接修改全局變量

  7.    global number

  8.    number += 1

  9.    # 為了避免讓程序出現(xiàn)串行,不能加sleep

  10.    # time.sleep(1)

  11.    # 釋放鎖

  12.    lock.release()

  13. # 鎖

  14. lock = threading.Lock()

  15. # 主線(xiàn)程的number

  16. number = 0

  17. t_objs = []

  18. for i in range(100):

  19.    t = threading.Thread(target=Princ)

  20.    t.start()

  21.    t_objs.append(t)

  22. for t in t_objs:

  23.    t.join()

  24. print('Number:', number)

遞歸鎖(Lock/RLock)

  1. import threading

  2. def run1():

  3.    print("grab the first part data")

  4.    lock.acquire()

  5.    global num

  6.    num += 1

  7.    lock.release()

  8.    return num

  9. def run2():

  10.    print("grab the second part data")

  11.    lock.acquire()

  12.    global num2

  13.    num2 += 1

  14.    lock.release()

  15.    return num2

  16. def run3():

  17.    lock.acquire()

  18.    res = run1()

  19.    print('--------between run1 and run2-----')

  20.    res2 = run2()

  21.    lock.release()

  22.    print(res, res2)

  23. t_objs = []

  24. if __name__ == '__main__':

  25.    num, num2 = 0, 0

  26.    lock = threading.RLock()  # RLock()類(lèi)似創(chuàng)建了一個(gè)字典,每次退出的時(shí)候找到字典的值進(jìn)行退出

  27.    # lock = threading.Lock()  # Lock()會(huì)阻塞在這兒

  28.    for i in range(10):

  29.        t = threading.Thread(target=run3)

  30.        t.start()

  31.        t_objs.append(t)

  32. for t in t_objs:

  33.    t.join()

  34. print(num, num2)

信號(hào)量(Semaphore)

互斥鎖同時(shí)只允許一個(gè)線(xiàn)程更改數(shù)據(jù),而Semaphore是同時(shí)允許一定數(shù)量的線(xiàn)程更改數(shù)據(jù)

  1. import threading

  2. import time

  3. def run(n):

  4.    semaphore.acquire()  # 獲取信號(hào),信號(hào)可以有多把鎖

  5.    time.sleep(1)  # 等待一秒鐘

  6.    print("run the thread: %s\n" % n)

  7.    semaphore.release()  # 釋放信號(hào)

  8. t_objs = []

  9. if __name__ == '__main__':

  10.    semaphore = threading.BoundedSemaphore(5)  # 聲明一個(gè)信號(hào)量,最多允許5個(gè)線(xiàn)程同時(shí)運(yùn)行

  11.    for i in range(20):  # 運(yùn)行20個(gè)線(xiàn)程

  12.        t = threading.Thread(target=run, args=(i,))  # 創(chuàng)建線(xiàn)程

  13.        t.start()  # 啟動(dòng)線(xiàn)程

  14.        t_objs.append(t)

  15. for t in t_objs:

  16.    t.join()

  17. print('>>>>>>>>>>>>>')

以上代碼中,類(lèi)似與創(chuàng)建了一個(gè)隊(duì)列,最多放5個(gè)任務(wù),每執(zhí)行完成一個(gè)任務(wù)就會(huì)往后面增加一個(gè)任務(wù)。

多進(jìn)程

多進(jìn)程的資源是獨(dú)立的,不可以互相訪(fǎng)問(wèn)。

啟動(dòng)一個(gè)進(jìn)程

  1. from multiprocessing import Process

  2. import time

  3. def f(name):

  4.    time.sleep(2)

  5.    print('hello', name)

  6. if __name__ == '__main__':

  7.    # 創(chuàng)建一個(gè)進(jìn)程

  8.    p = Process(target=f, args=('bob',))

  9.    # 啟動(dòng)

  10.    p.start()

  11.    # 等待進(jìn)程執(zhí)行完畢

  12.    p.join(

在進(jìn)程內(nèi)啟動(dòng)一個(gè)線(xiàn)程

  1. from multiprocessing import Process

  2. import threading

  3. def Thread(String):

  4.    print(String)

  5. def Proces(String):

  6.    print('hello', String)

  7.    t = threading.Thread(target=Thread, args=('Thread %s' % (String),))  # 創(chuàng)建一個(gè)線(xiàn)程

  8.    t.start()  # 啟動(dòng)它

  9. if __name__ == '__main__':

  10.    p = Process(target=Proces, args=('World',))  # 創(chuàng)建一個(gè)進(jìn)程

  11.    p.start()  # 啟動(dòng)

  12.    p.join()  # 等待進(jìn)程執(zhí)行完畢

啟動(dòng)一個(gè)多進(jìn)程

  1. from multiprocessing import Process

  2. import time

  3. def f(name):

  4.    time.sleep(2)

  5.    print('hello', name)

  6. if __name__ == '__main__':

  7.    for n in range(10):  # 創(chuàng)建一個(gè)進(jìn)程

  8.        p = Process(target=f, args=('bob %s' % (n),))

  9.        # 啟動(dòng)

  10.        p.start()

  11.        # 等待進(jìn)程執(zhí)行完畢

獲取啟動(dòng)進(jìn)程的PID

  1. # _*_ coding:utf-8 _*_

  2. from multiprocessing import Process

  3. import os

  4. def info(String):

  5.    print(String)

  6.    print('module name:', __name__)

  7.    print('父進(jìn)程的PID:', os.getppid())

  8.    print('子進(jìn)程的PID:', os.getpid())

  9.    print("\n")

  10. def ChildProcess():

  11.    info('\033[31;1mChildProcess\033[0m')

  12. if __name__ == '__main__':

  13.    info('\033[32;1mTheParentProcess\033[0m')

  14.    p = Process(target=ChildProcess)

  15.    p.start()

輸出結(jié)果

  1. C:\Python\Python35\python.exe E:/MyCodeProjects/多進(jìn)程/s1.py

  2. TheParentProcess

  3. module name: __main__

  4. # Pycharm的PID

  5. 父進(jìn)程的PID: 6888

  6. # 啟動(dòng)的腳本PID

  7. 子進(jìn)程的PID: 4660

  8. ChildProcess

  9. module name: __mp_main__

  10. # 腳本的PID

  11. 父進(jìn)程的PID: 4660

  12. # 父進(jìn)程啟動(dòng)的子進(jìn)程PID

  13. 子進(jìn)程的PID: 8452

  14. Process finished with exit code 0

進(jìn)程間通信

默認(rèn)情況下進(jìn)程與進(jìn)程之間是不可以互相通信的,若要實(shí)現(xiàn)互相通信則需要一個(gè)中間件,另個(gè)進(jìn)程之間通過(guò)中間件來(lái)實(shí)現(xiàn)通信,下面是進(jìn)程間通信的幾種方式。

進(jìn)程Queue

  1. # _*_ coding:utf-8 _*_

  2. from multiprocessing import Process, Queue

  3. def ChildProcess(Q):

  4.    Q.put(['Hello', None, 'World'])  # 在Queue里面上傳一個(gè)列表

  5. if __name__ == '__main__':

  6.    q = Queue()  # 創(chuàng)建一個(gè)Queue

  7.    p = Process(target=ChildProcess, args=(q,))  # 創(chuàng)建一個(gè)子進(jìn)程,并把Queue傳給子進(jìn)程,相當(dāng)于克隆了一份Queue

  8.    p.start()  # 啟動(dòng)子進(jìn)程

  9.    print(q.get())  # 獲取q中的數(shù)據(jù)

  10.    p.join()

管道(Pipes)

  1. # _*_ coding:utf-8 _*_

  2. from multiprocessing import Process, Pipe

  3. def ChildProcess(conn):

  4.    conn.send(['Hello', None, 'World'])  # 寫(xiě)一段數(shù)據(jù)

  5.    conn.close()  # 關(guān)閉

  6. if __name__ == '__main__':

  7.    parent_conn, child_conn = Pipe()  # 生成一個(gè)管道實(shí)例,parent_conn, child_conn管道的兩頭

  8.    p = Process(target=ChildProcess, args=(child_conn,))

  9.    p.start()

  10.    print(parent_conn.recv())  # 收取消息

  11.    p.join()

數(shù)據(jù)共享(Managers)

  1. # _*_ coding:utf-8 _*_

  2. # _*_ coding:utf-8 _*_

  3. from multiprocessing import Process, Manager

  4. import os

  5. def ChildProcess(Dict, List):

  6.    Dict['k1'] = 'v1'

  7.    Dict['k2'] = 'v2'

  8.    List.append(os.getpid())  # 獲取子進(jìn)程的PID

  9.    print(List)  # 輸出列表中的內(nèi)容

  10. if __name__ == '__main__':

  11.    manager = Manager()  # 生成Manager對(duì)象

  12.    Dict = manager.dict()  # 生成一個(gè)可以在多個(gè)進(jìn)程之間傳遞共享的字典

  13.    List = manager.list()  # 生成一個(gè)字典

  14.    ProcessList = []  # 創(chuàng)建一個(gè)空列表,存放進(jìn)程的對(duì)象,等待子進(jìn)程執(zhí)行用于

  15.    for i in range(10):  # 生成是個(gè)子進(jìn)程

  16.        p = Process(target=ChildProcess, args=(Dict, List))  # 創(chuàng)建一個(gè)子進(jìn)程

  17.        p.start()  # 啟動(dòng)

  18.        ProcessList.append(p)  # 把子進(jìn)程添加到p_list列表中

  19.    for res in ProcessList:  # 循環(huán)所有的子進(jìn)程

  20.        res.join()  # 等待執(zhí)行完畢

  21.    print('\n')

  22.    print(Dict)

  23.    print(List)

輸出結(jié)果

  1. C:\Python\Python35\python.exe E:/MyCodeProjects/多進(jìn)程/s4.py

  2. [5112]

  3. [5112, 3448]

  4. [5112, 3448, 4584]

  5. [5112, 3448, 4584, 2128]

  6. [5112, 3448, 4584, 2128, 11124]

  7. [5112, 3448, 4584, 2128, 11124, 10628]

  8. [5112, 3448, 4584, 2128, 11124, 10628, 5512]

  9. [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460]

  10. [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484]

  11. [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484, 6804]

  12. {'k1': 'v1', 'k2': 'v2'}

  13. [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484, 6804]

  14. Process finished with exit code 0

鎖(Lock)

  1. from multiprocessing import Process, Lock

  2. def ChildProcess(l, i):

  3.    l.acquire()  # 獲取鎖

  4.    print('hello world', i)

  5.    l.release()  # 釋放鎖

  6. if __name__ == '__main__':

  7.    lock = Lock()  # 生成Lock對(duì)象

  8.    for num in range(10):

  9.        Process(target=ChildProcess, args=(lock, num)).start()  # 創(chuàng)建并啟動(dòng)一個(gè)子進(jìn)程

進(jìn)程池

同一時(shí)間啟動(dòng)多少個(gè)進(jìn)程

  1. #!/use/bin/env python

  2. # _*_ coding: utf-8 _*_

  3. from multiprocessing import Pool

  4. import time

  5. def myFun(i):

  6.    time.sleep(2)

  7.    return i+100

  8. def end_call(arg):

  9.    print("end_call>>", arg)

  10. p = Pool(5)  # 允許進(jìn)程池內(nèi)同時(shí)放入5個(gè)進(jìn)程

  11. for i in range(10):

  12.    p.apply_async(func=myFun, args=(i,),callback=end_call) # # 平行執(zhí)行,callback是主進(jìn)程來(lái)調(diào)用

  13.    # p.apply(func=Foo)  # 串行執(zhí)行

  14. print("end")

  15. p.close()

  16. p.join() # 進(jìn)程池中進(jìn)程執(zhí)行完畢后再關(guān)閉,如果注釋?zhuān)敲闯绦蛑苯雨P(guān)閉。

線(xiàn)程池

簡(jiǎn)單實(shí)現(xiàn)

  1. #!/usr/bin/env python

  2. # -*- coding:utf-8 -*-

  3. import threading

  4. import queue

  5. import time

  6. class MyThread:

  7.    def __init__(self,max_num=10):

  8.        self.queue = queue.Queue()

  9.        for n in range(max_num):

  10.            self.queue.put(threading.Thread)

  11.    def get_thread(self):

  12.        return self.queue.get()

  13.    def put_thread(self):

  14.        self.queue.put(threading.Thread)

  15. pool = MyThread(5)

  16. def RunThread(arg,pool):

  17.    print(arg)

  18.    time.sleep(2)

  19.    pool.put_thread()

  20. for n in range(30):

  21.    thread = pool.get_thread()

  22.    t = thread(target=RunThread, args=(n,pool,))

  23.    t.start()

復(fù)雜版本

  1. #!/usr/bin/env python

  2. # -*- coding:utf-8 -*-

  3. import queue

  4. import threading

  5. import contextlib

  6. import time

  7. StopEvent = object()

  8. class ThreadPool(object):

  9.    def __init__(self, max_num, max_task_num = None):

  10.        if max_task_num:

  11.            self.q = queue.Queue(max_task_num)

  12.        else:

  13.            self.q = queue.Queue()

  14.        self.max_num = max_num

  15.        self.cancel = False

  16.        self.terminal = False

  17.        self.generate_list = []

  18.        self.free_list = []

  19.    def run(self, func, args, callback=None):

  20.        """

  21.        線(xiàn)程池執(zhí)行一個(gè)任務(wù)

  22.        :param func: 任務(wù)函數(shù)

  23.        :param args: 任務(wù)函數(shù)所需參數(shù)

  24.        :param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù),回調(diào)函數(shù)有兩個(gè)參數(shù)1、任務(wù)函數(shù)執(zhí)行狀態(tài);2、任務(wù)函數(shù)返回值(默認(rèn)為None,即:不執(zhí)行回調(diào)函數(shù))

  25.        :return: 如果線(xiàn)程池已經(jīng)終止,則返回True否則None

  26.        """

  27.        if self.cancel:

  28.            return

  29.        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:

  30.            self.generate_thread()

  31.        w = (func, args, callback,)

  32.        self.q.put(w)

  33.    def generate_thread(self):

  34.        """

  35.        創(chuàng)建一個(gè)線(xiàn)程

  36.        """

  37.        t = threading.Thread(target=self.call)

  38.        t.start()

  39.    def call(self):

  40.        """

  41.        循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)

  42.        """

  43.        current_thread = threading.currentThread()

  44.        self.generate_list.append(current_thread)

  45.        event = self.q.get()

  46.        while event != StopEvent:

  47.            func, arguments, callback = event

  48.            try:

  49.                result = func(*arguments)

  50.                success = True

  51.            except Exception as e:

  52.                success = False

  53.                result = None

  54.            if callback is not None:

  55.                try:

  56.                    callback(success, result)

  57.                except Exception as e:

  58.                    pass

  59.            with self.worker_state(self.free_list, current_thread):

  60.                if self.terminal:

  61.                    event = StopEvent

  62.                else:

  63.                    event = self.q.get()

  64.        else:

  65.            self.generate_list.remove(current_thread)

  66.    def close(self):

  67.        """

  68.        執(zhí)行完所有的任務(wù)后,所有線(xiàn)程停止

  69.        """

  70.        self.cancel = True

  71.        full_size = len(self.generate_list)

  72.        while full_size:

  73.            self.q.put(StopEvent)

  74.            full_size -= 1

  75.    def terminate(self):

  76.        """

  77.        無(wú)論是否還有任務(wù),終止線(xiàn)程

  78.        """

  79.        self.terminal = True

  80.        while self.generate_list:

  81.            self.q.put(StopEvent)

  82.        self.q.queue.clear()

  83.    @contextlib.contextmanager

  84.    def worker_state(self, state_list, worker_thread):

  85.        """

  86.        用于記錄線(xiàn)程中正在等待的線(xiàn)程數(shù)

  87.        """

  88.        state_list.append(worker_thread)

  89.        try:

  90.            yield

  91.        finally:

  92.            state_list.remove(worker_thread)

  93. # How to use

  94. pool = ThreadPool(5)

  95. def callback(status, result):

  96.    # status, execute action status

  97.    # result, execute action return value

  98.    pass

  99. def action(i):

  100.    print(i)

  101. for i in range(30):

  102.    ret = pool.run(action, (i,), callback)

  103. time.sleep(5)

  104. print(len(pool.generate_list), len(pool.free_list))

  105. print(len(pool.generate_list), len(pool.free_list))

  106. pool.close()

  107. pool.terminate()

什么是IO密集型和CPU密集型?

IO密集型(I/O bound)

頻繁網(wǎng)絡(luò)傳輸、讀取硬盤(pán)及其他IO設(shè)備稱(chēng)之為IO密集型,最簡(jiǎn)單的就是硬盤(pán)存取數(shù)據(jù),IO操作并不會(huì)涉及到CPU。

計(jì)算密集型(CPU bound)

程序大部分在做計(jì)算、邏輯判斷、循環(huán)導(dǎo)致cpu占用率很高的情況,稱(chēng)之為計(jì)算密集型,比如說(shuō)python程序中執(zhí)行了一段代碼1+1,這就是在計(jì)算1+1的值


本文出自 “一盞燭光” 博客,謝絕轉(zhuǎn)載!

以上內(nèi)容是否對(duì)您有幫助:
在線(xiàn)筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)