Java 同步器

2018-02-28 16:02 更新

Java線程教程 - Java同步器


同步器對象與一組線程一起使用。

它維護(hù)一個(gè)狀態(tài),根據(jù)它的狀態(tài),它讓一個(gè)線程通過或強(qiáng)迫它等待。

本節(jié)將討論四種類型的同步器:

  • Semaphores
  • Barriers
  • Latches
  • Exchangers

信號量

信號量用于控制可以訪問資源的線程數(shù)。

java.util.concurrent包中的Semaphore類表示信號量同步器。

您可以使用其構(gòu)造函數(shù)創(chuàng)建信號量,如下所示:

final int MAX_PERMITS  = 3;
Semaphore  s = new Semaphores(MAX_PERMITS);

Semaphore類的另一個(gè)構(gòu)造函數(shù)使用公平作為第二個(gè)參數(shù)

final int MAX_PERMITS  = 3;
Semaphore  s = new Semaphores(MAX_PERMITS,  true); // A  fair  semaphore

如果你創(chuàng)建一個(gè)公平的信號量,在多線程請求許可的情況下,信號量將保證先進(jìn)先出(FIFO)。也就是說,首先請求許可的線程將首先獲得許可。

要獲取許可證,請使用acquire()方法。

如果許可證可用,它立即返回。

它阻止如果許可證不可用。線程在等待許可證可用時(shí)可能中斷。

Semaphore類的其他方法允許您一次性獲取一個(gè)或多個(gè)許可證。要釋放許可證,請使用release()方法。

以下代碼顯示了一個(gè)Restaurant類,它使用信號量來控制對表的訪問。

import java.util.Random;
import java.util.concurrent.Semaphore;

class Restaurant {
  private Semaphore tables;

  public Restaurant(int tablesCount) {
    this.tables = new Semaphore(tablesCount);
  }

  public void getTable(int customerID) {
    try {
      System.out.println("Customer  #" + customerID
          + "  is trying  to  get  a  table.");
      tables.acquire();
      System.out.println("Customer #" + customerID + "  got  a  table.");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void returnTable(int customerID) {
    System.out.println("Customer #" + customerID + "  returned a  table.");
    tables.release();
  }
}
class RestaurantCustomer extends Thread {
  private Restaurant r;
  private int customerID;
  private static final Random random = new Random();

  public RestaurantCustomer(Restaurant r, int customerID) {
    this.r = r;
    this.customerID = customerID;
  }
  public void run() {
    r.getTable(this.customerID); // Get a table
    try {
      int eatingTime = random.nextInt(30) + 1;
      System.out.println("Customer #" + this.customerID
          + "  will eat for " + eatingTime + "  seconds.");
      Thread.sleep(eatingTime * 1000);
      System.out.println("Customer #" + this.customerID
          + "  is done  eating.");
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      r.returnTable(this.customerID);
    }
  }
}
public class Main{
  public static void main(String[] args) {
    Restaurant restaurant = new Restaurant(2);
    for (int i = 1; i <= 5; i++) {
      RestaurantCustomer c = new RestaurantCustomer(restaurant, i);
      c.start();
    }
  }
}

上面的代碼生成以下結(jié)果。

障礙器

屏障使一組線在屏障點(diǎn)匯合。

來自到達(dá)屏障的組的線程等待,直到該組中的所有線程到達(dá)。

一旦組中的最后一個(gè)線程到達(dá)屏障,組中的所有線程都將被釋放。

當(dāng)你有一個(gè)可以分成子任務(wù)的任務(wù)時(shí),你可以使用一個(gè)屏障;每個(gè)子任務(wù)可以在單獨(dú)的線程中執(zhí)行,并且每個(gè)線程必須在共同點(diǎn)處相遇以組合它們的結(jié)果。

java.util.concurrent包中的CyclicBarrier類提供了屏障同步器的實(shí)現(xiàn)。

CyclicBarrier類可以通過調(diào)用其reset()方法來重用。

以下代碼顯示了如何在程序中使用循環(huán)障礙。

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

class Worker extends Thread {
  private CyclicBarrier barrier;
  private int ID;
  private static Random random = new Random();

  public Worker(int ID, CyclicBarrier barrier) {
    this.ID = ID;
    this.barrier = barrier;
  }
  public void run() {
    try {
      int workTime = random.nextInt(30) + 1;
      System.out.println("Thread #" + ID + " is going to work for " + workTime + "  seconds");
      Thread.sleep(workTime * 1000);
      System.out.println("Thread #" + ID + " is waiting at the barrier.");
      this.barrier.await();
      System.out.println("Thread #" + ID + " passed the barrier.");
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (BrokenBarrierException e) {
      System.out.println("Barrier is broken.");
    }
  }

}
public class Main {
  public static void main(String[] args) {
    Runnable barrierAction = () -> System.out.println("We are ready.");
    CyclicBarrier barrier = new CyclicBarrier(3, barrierAction);
    for (int i = 1; i <= 3; i++) {
      Worker t = new Worker(i, barrier);
      t.start();
    }
  }
}

上面的代碼生成以下結(jié)果。

Phasers

Phaser提供類似于CyclicBarrier和CountDownLatch同步器的功能。它提供以下功能:

Phaser是可重復(fù)使用的。

在Phaser上同步的參與方數(shù)量可以動(dòng)態(tài)更改。在循環(huán)障礙中,當(dāng)創(chuàng)建障礙時(shí),方的數(shù)量是固定的。

移相器具有相關(guān)的相位編號,從零開始。當(dāng)所有注冊方都到達(dá)移相器時(shí),移相器進(jìn)入下一個(gè)階段,階段編號加1。相位編號的最大值為Integer.MAX_VALUE。在其最大值之后,相位編號重新從零開始。

Phaser有終止?fàn)顟B(tài)。在終止?fàn)顟B(tài)的Phaser上調(diào)用的所有同步方法立即返回,而不等待提前。

移相器有三種類型的參與者計(jì)數(shù):注冊參與者計(jì)數(shù),到達(dá)參與者計(jì)數(shù)和未參與方計(jì)數(shù)。

注冊方數(shù)量是注冊同步的方的數(shù)量。到達(dá)的當(dāng)事方數(shù)目是已經(jīng)到達(dá)移相器的當(dāng)前階段的各方的數(shù)目。

未攜帶者數(shù)量是尚未到達(dá)移動(dòng)器的當(dāng)前階段的各方的數(shù)量。

當(dāng)最后一方到達(dá)時(shí),移相器前進(jìn)到下一階段。

或者,當(dāng)所有注冊方都到達(dá)移動(dòng)器時(shí),Phaser可以執(zhí)行移相器操作。

CyclicBarrier允許您執(zhí)行屏障操作,這是一個(gè)Runnable任務(wù)。

我們通過在Phaser類的onAdvance()方法中編寫代碼來指定移相器操作。

我們需要繼承Phaser類,并覆蓋onAdvance()方法以提供Phaser動(dòng)作。

以下代碼顯示了如何表示通過在Phaser上同步啟動(dòng)的任務(wù)

import java.util.Random;
import java.util.concurrent.Phaser;

class StartTogetherTask extends Thread {
  private Phaser phaser;
  private String taskName;
  private static Random rand = new Random();

  public StartTogetherTask(String taskName, Phaser phaser) {
    this.taskName = taskName;
    this.phaser = phaser;
  }

  @Override
  public void run() {
    System.out.println(taskName + ":Initializing...");
    int sleepTime = rand.nextInt(5) + 1;
    try {
      Thread.sleep(sleepTime * 1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println(taskName + ":Initialized...");
    phaser.arriveAndAwaitAdvance();
    System.out.println(taskName + ":Started...");
  }
}

public class Main {
  public static void main(String[] args) {
    Phaser phaser = new Phaser(1);
    for (int i = 1; i <= 3; i++) {
      phaser.register();
      String taskName = "Task  #" + i;
      StartTogetherTask task = new StartTogetherTask(taskName, phaser);
      task.start();
    }
    phaser.arriveAndDeregister();
  }
}

上面的代碼生成以下結(jié)果。

例子

以下代碼顯示了如何向Phaser添加Phaser Action。

import java.util.concurrent.Phaser;

public class Main {
  public static void main(String[] args) {
    Phaser phaser = new Phaser() {
      protected boolean onAdvance(int phase, int parties) {
        System.out.println("Inside onAdvance(): phase  = " + phase
            + ",  Registered Parties = " + parties);
        // Do not terminate the phaser by returning false
        return false;
      }
    };
    // Register the self (the "main" thread) as a party 
    phaser.register();
    System.out.println("#1: isTerminated():" + phaser.isTerminated());
    phaser.arriveAndDeregister();

    // Trigger another phase advance
    phaser.register();
    phaser.arriveAndDeregister();

    System.out.println("#2: isTerminated():" + phaser.isTerminated());
    phaser.forceTermination();
    System.out.println("#3: isTerminated():" + phaser.isTerminated());
  }
}

上面的代碼生成以下結(jié)果。

例2

以下代碼顯示如何使用移相器生成一些整數(shù)。

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Phaser;

class AdderTask extends Thread {
  private Phaser phaser;
  private String taskName;
  private List<Integer> list;

  public AdderTask(String taskName, Phaser phaser, List<Integer> list) {
    this.taskName = taskName;
    this.phaser = phaser;
    this.list = list;
  }

  @Override
  public void run() {
    do {
      System.out.println(taskName + "  added  " + 3);
      list.add(3);
      phaser.arriveAndAwaitAdvance();
    } while (!phaser.isTerminated());
  }
}

public class Main {
  public static void main(String[] args) {
    final int PHASE_COUNT = 2;
    Phaser phaser = new Phaser() {
      public boolean onAdvance(int phase, int parties) {
        System.out.println("Phase:" + phase + ", Parties:" + parties
            + ",  Arrived:" + this.getArrivedParties());
        boolean terminatePhaser = false;
        if (phase >= PHASE_COUNT - 1 || parties == 0) {
          terminatePhaser = true;
        }

        return terminatePhaser;
      }
    };
    List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
    int ADDER_COUNT = 3;
    phaser.bulkRegister(ADDER_COUNT + 1);
    for (int i = 1; i <= ADDER_COUNT; i++) {
      String taskName = "Task  #" + i;
      AdderTask task = new AdderTask(taskName, phaser, list);
      task.start();
    }
    while (!phaser.isTerminated()) {
      phaser.arriveAndAwaitAdvance();
    }
    int sum = 0;
    for (Integer num : list) {
      sum = sum + num;
    }
    System.out.println("Sum = " + sum);
  }
}

上面的代碼生成以下結(jié)果。

鎖存器

鎖存器使一組線程等待,直到它到達(dá)其終端狀態(tài)。

一旦鎖存器達(dá)到其終端狀態(tài),它允許所有線程通過。

與障礙不同,它是一個(gè)一次性的對象。它不能被重置和重用。

使用鎖存器,其中在一定數(shù)量的一次性活動(dòng)完成之前,多個(gè)活動(dòng)不能進(jìn)行。

例如,一個(gè)服務(wù)不應(yīng)該啟動(dòng),直到它依賴的所有服務(wù)都已啟動(dòng)。

java.util.concurrent包中的CountDownLatch類提供了一個(gè)鎖存器的實(shí)現(xiàn)。

import java.util.concurrent.CountDownLatch;
class LatchHelperService extends Thread {
  private int ID;
  private CountDownLatch latch;
  public LatchHelperService(int ID, CountDownLatch latch) {
    this.ID = ID;
    this.latch = latch;
  }
  public void run() {
    try {
      Thread.sleep(1000);
      System.out.println("Service #" + ID + "  has  started...");
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      this.latch.countDown();
    }
  }
}

class LatchMainService extends Thread {
  private CountDownLatch latch;

  public LatchMainService(CountDownLatch latch) {
    this.latch = latch;
  }
  public void run() {
    try {
      System.out.println("waiting for services to start.");
      latch.await();
      System.out.println("started.");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

public class Main {
  public static void main(String[] args) {
    // Create a countdown latch with 2 as its counter
    CountDownLatch latch = new CountDownLatch(2);
    LatchMainService ms = new LatchMainService(latch);
    ms.start();
    for (int i = 1; i <= 2; i++) {
      LatchHelperService lhs = new LatchHelperService(i, latch);
      lhs.start();
    }
  }
}

上面的代碼生成以下結(jié)果。

交換器

交換器允許兩個(gè)線程在同步點(diǎn)處等待彼此。

當(dāng)兩個(gè)線程到達(dá)時(shí),它們交換一個(gè)對象并繼續(xù)他們的活動(dòng)。

Exchanger類提供了交換器同步器的實(shí)現(xiàn)。

以下代碼顯示將使用交換器與客戶交換數(shù)據(jù)的生產(chǎn)者線程。

import java.util.ArrayList;
import java.util.concurrent.Exchanger;

class ExchangerProducer extends Thread {
  private Exchanger<ArrayList<Integer>> exchanger;
  private ArrayList<Integer> buffer = new ArrayList<Integer>();
  public ExchangerProducer(Exchanger<ArrayList<Integer>> exchanger) {
    this.exchanger = exchanger;
  }

  public void run() {
    while (true) {
      try {
        System.out.println("Producer.");
        Thread.sleep(1000);
        fillBuffer();
        System.out.println("Producer has produced and waiting:" + buffer);
        buffer = exchanger.exchange(buffer);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  public void fillBuffer() {
    for (int i = 0; i <= 3; i++) {
      buffer.add(i);
    }
  }
}

class ExchangerConsumer extends Thread {
  private Exchanger<ArrayList<Integer>> exchanger;
  private ArrayList<Integer> buffer = new ArrayList<Integer>();
  public ExchangerConsumer(Exchanger<ArrayList<Integer>> exchanger) {
    this.exchanger = exchanger;
  }

  public void run() {
    while (true) {
      try {
        System.out.println("Consumer.");
        buffer = exchanger.exchange(buffer);
        System.out.println("Consumer  has received:" + buffer);
        Thread.sleep(1000);
        System.out.println("eating:"+buffer);
        buffer.clear();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }  
}
public class Main {
  public static void main(String[] args) {
    Exchanger<ArrayList<Integer>> exchanger = new Exchanger<>();
    ExchangerProducer producer = new ExchangerProducer(exchanger);
    ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
    producer.start();
    consumer.start();
  }
}

上面的代碼生成以下結(jié)果。

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號