App下載

python實現(xiàn)多進(jìn)程并發(fā)控制Semaphore與互斥鎖LOCK

猿友 2021-08-03 11:22:09 瀏覽數(shù) (2354)
反饋

學(xué)習(xí)過操作系統(tǒng)的小伙伴應(yīng)該對資源的控制有所了解——資源是一定的,不能讓所有進(jìn)程無控制地進(jìn)行搶占。這就要求我們對進(jìn)行進(jìn)行并發(fā)控制。那么python是如何實現(xiàn)多進(jìn)程并發(fā)控制的呢?主要是使用互斥鎖和新號量來控制進(jìn)程資源調(diào)度。接下來小編就來介紹怎么使用python實現(xiàn)多進(jìn)程并發(fā)控制吧。

一、了解鎖

應(yīng)用場景舉例描述: Lock 互斥鎖:舉例說明–有三個同事同時需要上廁所,但是只有一間廁所,將同事比作進(jìn)程,多個進(jìn)程并打搶占一個廁所,我們要保證順序優(yōu)先, 一個個來,那么就必須串行,先到先得,有人使用了,就鎖住,結(jié)束后剩余的人繼續(xù)爭搶

1、利用Lock來處理

模擬三個同事?lián)屨紟?/p>

from multiprocessing import Process
from multiprocessing import Lock
import time
import random

def task1(p, lock):
    # 上鎖
    lock.acquire()
    print(f'{p} 開始排泄')
    time.sleep(random.randint(1, 3))
    print(f'{p} 排泄結(jié)束')
    # 解鎖
    lock.release()

def task2(p, lock):
    lock.acquire()
    print(f'{p} 開始排泄')
    time.sleep(random.randint(1, 3))
    print(f'{p} 排泄結(jié)束')
    lock.release()

def task3(p, lock):
    lock.acquire()
    print(f'{p} 開始排泄')
    time.sleep(random.randint(1, 3))
    print(f'{p} 排泄結(jié)束')
    lock.release()


if __name__ == '__main__':
    # 實例化一個鎖對象
    mutex = Lock()
    # 將鎖以參數(shù)的形式傳入進(jìn)程對象
    p1 = Process(target=task1, args=('task1', mutex,))
    p2 = Process(target=task2, args=('task2', mutex,))
    p3 = Process(target=task3, args=('task3', mutex,))

    p1.start()
    p2.start()
    p3.start()

執(zhí)行結(jié)果:

# 輸出結(jié)果:三個進(jìn)程開始爭搶互斥鎖,先搶到的先執(zhí)行,執(zhí)行結(jié)束后,釋放掉鎖,剩下的繼續(xù)爭搶
task1 開始排泄
task1 排泄結(jié)束
task2 開始排泄
task2 排泄結(jié)束
task3 開始排泄
task3 排泄結(jié)束

1、 注意:

  • 互斥鎖在函數(shù)中,lock.acquire()上鎖一次就要lock.release()解鎖一次,在上鎖與解鎖之間寫需要執(zhí)行的代碼。
  • 如果連續(xù)上鎖兩次以上,就會出現(xiàn)死鎖現(xiàn)象,代碼將不繼續(xù)執(zhí)行下去。必須是鎖一次解一次。

2、 lock和join比較:

  • 共同點------都可以把并行變成串行,保證了執(zhí)行順序
  • 不同點------join是人為設(shè)定了順序,lock是讓其爭搶順序,保證了公平性

2、利用反射,來優(yōu)化上面的代碼

上面的代碼雖然起到了先進(jìn)先出,一進(jìn)一出的效果,但是并不完美。總所周知,我們上廁所是誰先搶到誰先上,并不是說按照代碼里start()順序執(zhí)行。應(yīng)該由先搶占到的進(jìn)程限制性才更合理。

from multiprocessing import Process
from multiprocessing import Lock
import time
import random
import sys

def task1(p, lock):
    # 上鎖
    lock.acquire()
    print(f'{p} 開始打印')
    time.sleep(random.randint(1, 3))
    print(f'{p} 打印結(jié)束')
    # 解鎖
    lock.release()

def task2(p, lock):
    lock.acquire()
    print(f'{p} 開始打印')
    time.sleep(random.randint(1, 3))
    print(f'{p} 打印結(jié)束')
    lock.release()

def task3(p, lock):
    lock.acquire()
    print(f'{p} 開始打印')
    time.sleep(random.randint(1, 3))
    print(f'{p} 打印結(jié)束')
    lock.release()


if __name__ == '__main__':
    slock = Lock()
    for i in range(1,4):
       p = Process(target=getattr(sys.modules[__name__], f'task{i}'), args=(f'task{i}', slock))
       p.start()

輸出結(jié)果:

task2 開始打印
task2 打印結(jié)束
task3 開始打印
task3 打印結(jié)束
task1 開始打印
task1 打印結(jié)束

二、進(jìn)程并發(fā)控制 semaphore

semaphore(信號量):用來控制對共享資源的訪問數(shù)量,可以控制同一時刻并發(fā)的進(jìn)程數(shù)
信號量: 也是一把鎖,但是不保證數(shù)據(jù)安全性,同時開啟多個線程,但是規(guī)定了同時并發(fā)執(zhí)行的上限,后面走多少,進(jìn)多少。(用于控制并發(fā)數(shù)量)

1.多進(jìn)程控制示例(1)

# 舉例說明:一間廁所有5個坑位,最多只能同時有5個人上廁所,當(dāng)前時刻有20個人想上廁所,但是只能讓5個人進(jìn)去,然后出來多少個,才能進(jìn)去多少個上廁所

# 從一個模塊引用多個功能的時候,用逗號隔開
from threading import Semaphore, Thread, currentThread
import time
import random

sem = Semaphore(3)             # 并發(fā)執(zhí)行數(shù)設(shè)置為5

def task():
    sem.acquire()
    print(f'{currentThread().name}')
    time.sleep(random.randint(1,3))
    sem.release()

if __name__ == '__main__':
    for i in range(20):
        t = Thread(target=task)
        t.start()

執(zhí)行結(jié)果:首次并發(fā)量是3,后面先搶到鎖先執(zhí)行

Thread-1
Thread-2
Thread-3

Thread-4
Thread-5

Thread-6
Thread-7

Thread-8

Process finished with exit code 0

2.多進(jìn)程控制示例(2)

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 搶占并獲得鎖,運行")
    time.sleep(i)
    print(time.strftime('%Y-%m-%d %H:%M:%S'), multiprocessing.current_process().name + " 運行結(jié)束,釋放鎖")
    s.release()

if __name__ == '__main__':
    s = multiprocessing.Semaphore(2)
    for i in range(8):
        p = multiprocessing.Process(target=worker, args=(s, 1))
        p.start()

執(zhí)行結(jié)果:

在執(zhí)行結(jié)果輸出的終端,每阻塞一次,按下回車鍵,可以更加清晰的看出進(jìn)程的并發(fā)執(zhí)行。
由下面執(zhí)行結(jié)果可以看出,同一時刻,有兩個進(jìn)程在執(zhí)行
2021-05-18 22:50:37 Process-1 搶占并獲得鎖,運行
2021-05-18 22:50:37 Process-2 搶占并獲得鎖,運行

2021-05-18 22:50:38 Process-1 運行結(jié)束,釋放鎖
2021-05-18 22:50:38 Process-3 搶占并獲得鎖,運行
2021-05-18 22:50:38 Process-2 運行結(jié)束,釋放鎖
2021-05-18 22:50:38 Process-4 搶占并獲得鎖,運行

2021-05-18 22:50:39 Process-3 運行結(jié)束,釋放鎖
2021-05-18 22:50:39 Process-5 搶占并獲得鎖,運行
2021-05-18 22:50:39 Process-4 運行結(jié)束,釋放鎖
2021-05-18 22:50:39 Process-6 搶占并獲得鎖,運行

2021-05-18 22:50:40 Process-5 運行結(jié)束,釋放鎖
2021-05-18 22:50:40 Process-7 搶占并獲得鎖,運行
2021-05-18 22:50:40 Process-6 運行結(jié)束,釋放鎖
2021-05-18 22:50:40 Process-8 搶占并獲得鎖,運行

2021-05-18 22:50:41 Process-7 運行結(jié)束,釋放鎖
2021-05-18 22:50:41 Process-8 運行結(jié)束,釋放鎖

Process finished with exit code 0

三、進(jìn)程同步之LOCK

多個進(jìn)程并發(fā)執(zhí)行,提高資源利用率,從而提高效率,但是有時候我們需要在某一時刻只能有一個進(jìn)程訪問某個共享資源的話,就需要使用鎖LOCK

1.不加LOCK的示例

import multiprocessing
import time

def task1():
    n = 4
    while n > 1:
        print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task1 輸出信息')
        time.sleep(1)
        n -= 1

def task2():
    n = 4
    while n > 1:
        print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task2 輸出信息')
        time.sleep(1)
        n -= 1

def task3():
    n = 4
    while n > 1:
        print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task3 輸出信息')
        time.sleep(1)
        n -= 1

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=task1)
    p2 = multiprocessing.Process(target=task2)
    p3 = multiprocessing.Process(target=task3)
    p1.start()
    p2.start()
    p3.start()

執(zhí)行結(jié)果:

2021-59-18 22:59:46  task1 輸出信息
2021-59-18 22:59:46  task2 輸出信息
2021-59-18 22:59:46  task3 輸出信息

2021-59-18 22:59:47  task1 輸出信息
2021-59-18 22:59:47  task2 輸出信息
2021-59-18 22:59:47  task3 輸出信息

2021-59-18 22:59:48  task1 輸出信息
2021-59-18 22:59:48  task2 輸出信息
2021-59-18 22:59:48  task3 輸出信息

Process finished with exit code 0

2.加上LOCK的示例

有兩種加鎖方式:首先將 lock = multiprocessing.Lock() 生成鎖對象lock

  1. with lock: with會在執(zhí)行前啟動lock,在執(zhí)行結(jié)束后關(guān)閉lock
  2. lock.acquire() … lock.release() : 注意,這倆必須是一個接一個的對應(yīng)關(guān)系
import multiprocessing

import time

def task1(lock):
    with lock:
        n = 4
        while n > 1:
            print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task1 輸出信息')
            time.sleep(1)
            n -= 1

def task2(lock):
    lock.acquire()
    n = 4
    while n > 1:
        print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task2 輸出信息')
        time.sleep(1)
        n -= 1
    lock.release()

def task3(lock):
    lock.acquire()
    n = 4
    while n > 1:
        print(f'{time.strftime("%Y-%M-%d %H:%M:%S")}  task3 輸出信息')
        time.sleep(1)
        n -= 1
    lock.release()

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    p1 = multiprocessing.Process(target=task1, args=(lock,))
    p2 = multiprocessing.Process(target=task2, args=(lock,))
    p3 = multiprocessing.Process(target=task3, args=(lock,))
    p1.start()
    p2.start()
    p3.start()

執(zhí)行結(jié)果

2021-11-18 23:11:37  task1 輸出信息

2021-11-18 23:11:38  task1 輸出信息

2021-11-18 23:11:39  task1 輸出信息

2021-11-18 23:11:40  task2 輸出信息

2021-11-18 23:11:41  task2 輸出信息

2021-11-18 23:11:42  task2 輸出信息

2021-11-18 23:11:43  task3 輸出信息

2021-11-18 23:11:44  task3 輸出信息

2021-11-18 23:11:45  task3 輸出信息

Process finished with exit code 0
 

到此這篇python實現(xiàn)多進(jìn)程并發(fā)控制的文章就介紹到這了,更多python進(jìn)程相關(guān)的學(xué)習(xí)內(nèi)容請搜索W3Cschool以前的文章或繼續(xù)瀏覽下面的相關(guān)文章。

0 人點贊