JAVA中生產者-消費者問題的演變

JAVA中生產者-消費者問題的演變

生產者 - 消費者問題是多進程同步問題的典型示例。對於我們大多數人來說,這個問題可能是我們在學校學習並首次面對並行算法的第一個同步問題。它很簡單,它恢復了並行計算中的最大挑戰 - 通過多個進程共享單個資源。

問題陳述

有兩個進程,生產者和使用者,共享一個有限大小的公共緩衝區。生產者“生成”數據並將其存儲在緩衝區中,消費者“消耗”數據,將其從緩衝區中刪除。有兩個並行運行的進程,我們需要確保生成器在緩衝區已滿時不會將新數據放入緩衝區,如果緩衝區為空,消費者不會嘗試從緩衝區中刪除數據。

解決方案

為了解決這個併發問題,生產者和消費者必須相互通信。如果緩衝區已滿,則生產者將進入休眠狀態並等待通知。消費者將從緩衝區中刪除一些數據後,它將通知生產者,然後,生產者將再次開始重新填充緩衝區。如果緩衝區為空,則會發生相同的過程,但在這種情況下,消費者將等待生產者通知。

如果此通信未正確完成,則可能導致死鎖,其中兩個進程將彼此等待。

經典方法

讓我們看看這個問題在Java中是怎麼解決的:

package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ClassicProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (this) {
while (list.size() >= size) {

//等待消費者
wait();
}
list.add(value);
System.out.println("Produced " + value);
value++;
// 通知消費者
notify();
Thread.sleep(1000);
}
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (this) {
while (list.size() == 0) {
// 等待生產者r
wait();
}
int value = list.poll();
System.out.println("Consume " + value);
// 通知生產者
notify();
Thread.sleep(1000);
}
}
}
}
}

/<integer>

這裡我們有兩個線程,一個生產者和一個消費者線程,它們共享一個公共緩衝區。生產者線程開始生成新元素並將它們存儲在緩衝區中。如果緩衝區已滿,它將進入休眠狀態並等待通知。否則,它會在緩衝區中放入一個新元素並通知消費者。就像我之前說的那樣,同樣的過程適用於消費者。如果緩衝區為空,則消費者將等待生產者通知。否則,它將從中刪除一個元素, 並將通知消費者。

如您所見,在前面的示例中,兩個作業都由buffer 對象管理 。該線程只是打電話 , 一切是由這兩種方法來完成,buffer.produce() 和 buffer.consume()。

這是一個值得商榷的主題,但在我看來,緩衝區不應該負責創建或刪除元素。當然,這取決於您想要實現的目標,但在這種情況下,緩衝區應該負責以線程安全的方式存儲和彙集元素,而不是生成元素。

所以我們將生產品和消耗邏輯移出buffer對象:

package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample2 {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
buffer.add(value);
System.out.println("Produced " + value);
value ++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = buffer.poll();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();

}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void add(int value) throws InterruptedException {
synchronized (this) {
while (list.size() >= size) {
wait();
}
list.add(value);
notify();
}
}
public int poll() throws InterruptedException {
synchronized (this) {
while (list.size() == 0) {
wait();
}
int value = list.poll();
notify();
return value;
}
}
}
}
/<integer>

現在,緩衝區負責以線程安全的方式存儲和刪除元素。

阻塞隊列

我們可以進一步改進這一點。在前面的示例中,我們創建了一個緩衝區,在存儲元素時,等待一個插槽在沒有更多空間的情況下可用,並且在池化時,如果緩衝區為空,它等待一個元素變得可用,使存儲和刪除操作成為線程安全的。

但是,Java已經有了這個集合。這就是所謂的一 BlockingQueue ,這是一個隊列是線程安全的。它完全符合我們的要求。因此,如果我們BlockingQueue 在示例中使用 ,則不必實現等待和通知機制。

讓我們看看它的代碼示例:

package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<integer> blockingQueue = new LinkedBlockingDeque<>(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
/<integer>

與之前完全一樣。它們以相同的方式生產和消費元素。唯一的區別是,我們在這裡使用的是一個 blockingQueue 而不是我們的 buffer 對象。

有關阻塞隊列的一些詳細信息

BlockingQueue有兩種類型

  • 有界隊列
  • 無界隊列

無界隊列幾乎可以無限增長,並且添加操作不會阻塞。我們可以像這樣創建一個無界隊列:

BlockingQueue < String > blockingQueue = new LinkedBlockingDeque <>();

在這種情況下,由於添加操作沒有阻塞,因此生產者不必在添加新元素時等待。每次生產者想要添加新元素時,隊列都會存儲它。但是,這裡有一個問題。如果消費者沒有比生產者添加新元素更快地刪除元素,那麼內存將填滿並且我們將獲得 OutOfMemory 異常。

相反,有界隊列具有固定大小。我們可以創建一個這樣的:

BlockingQueue < String > blockingQueue = new LinkedBlockingDeque <>(10);

主要區別在於使用有界隊列,如果隊列已滿並且生產者嘗試存儲另一個元素,則根據用於添加的方法,隊列將阻塞,直到它有足夠的空間。

在阻塞隊列中添加元素有四種方法:

  • add() - 如果插入成功則返回true,否則返回trueIllegalStateException
  • put() - 在隊列中插入一個元素,並在必要時等待一個空閒插槽
  • offer() -如果插入成功返回true,否則,返回 false
  • offer(E e,long timeout,TimeUnit unit) - 如果元素未滿,則將元素插入隊列,或等待指定超時內的可用插槽

因此,如果我們使用該put() 方法並且隊列已滿,則生產者必須等到有空閒插槽。這就是我們在前面的示例中使用的內容。

使用線程池

我們還能在這方面怎麼改進呢?讓我們來分析一下我們做了什麼。我們實例化了兩個線程,一個將一些元素放入阻塞隊列,生成器,另一個從隊列(消費者)則檢索元素。

但是,良好的軟件技術表明手動創建和銷燬線程是不好的做法。線程創建是一項昂貴的任務。每個線程創建意味著以下步驟:

  • 它為線程堆棧分配內存
  • 操作系統創建與Java線程對應的本機線程
  • 與線程相關的描述符被添加到JVM內部數據結構中

別誤會我的意思。擁有更多線程沒有任何問題。這就是並行性的工作原理。這裡的問題是我們“手動”創建它們。這是不好的做法。如果我們手動創建它們,除了創建成本之外,另一個問題是我們無法控制它們中有多少是同時運行的,例如,如果有數百萬個請求到達服務器應用程序,並且每個請求都會創建一個新線程,那麼數百萬個線程將並行運行,這可能導致線程不足

因此,我們需要一種戰略性管理線程的方法。這裡是線程池。

線程池根據選定的策略處理線程的生命週期。它擁有有限數量的空閒線程,並在需要解決任務時重用它們。這樣,我們不必每次都為新請求創建一個新線程,因此,我們可以避免線程飢餓。

Java線程池實現包括:

  • 任務隊列
  • 工作線程的集合
  • 一個線程工廠
  • 用於管理線程池狀態的元數據。

要同時運行某些任務,必須將它們放在任務隊列中。然後,當一個線程可用時,它將接收一個任務並運行它。可用線程越多,並行執行的任務就越多。

除了線程生命週期管理之外,使用線程池時的另一個優點是,當我們計劃如何拆分要同時執行的工作時,就可以以更實用的方式進行思考。並行的單位不再是線程; 這是任務。我們設計了一些併發執行的任務,而不是一些共享公共內存並且並行運行的線程。以功能方式思考可以幫助我們避免一些常見的多線程問題,例如死鎖或數據爭用。沒有什麼能阻止我們再次陷入這些問題,但是,因為使用功能範例的原因,我們不會強制性地同步併發計算(使用鎖)。這比直接使用線程和共享內存要少得多。在我們的示例中不是這種情況,因為任務共享一個阻塞隊列。

有了所有這些基礎,讓我們看看我們的示例如何使用線程池。

package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerExecutorService {
public static void main(String[] args) {
BlockingQueue<integer> blockingQueue = new LinkedBlockingDeque<>(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable producerTask = () -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumerTask = () -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executor.execute(producerTask);
executor.execute(consumerTask);
executor.shutdown();
}
}
/<integer>

這裡的不同之處在於,我們不是手動創建和運行消費者和生產者線程,而是構建一個線程池,它將接收兩個任務,即生產者和消費者任務。生產者和消費者任務實際上與前一個示例中使用的runnable相同。現在,執行程序(線程池實現)接收任務,其工作線程將執行它們。

在我們簡單的例子中,一切都將像以前一樣工作。就像前面的例子一樣,我們仍然有兩個線程,它們仍然以相同的方式生成和使用元素。因此,我們在這裡沒有性能改進,但代碼看起來更乾淨。我們不再手動構建線程,而是指定我們想要的內容。並且,我們想要一種並行執行某些任務的方法。

因此,當我們使用線程池時,就不必將線程視為並行的單位,而是考慮一些併發執行的任務。這就是我們需要知道的,執行者將處理剩下的事情。它將接收一些任務,然後,它將使用可用的工作線程執行它們。

結語

首先,我們看到了消費者 - 生產者問題的“傳統”解決方案。我們試圖在沒有必要的情況下不重新發明輪子,而是重新使用已經測試過的解決方案。那麼,為什麼不使用已經提供的Java阻塞隊列,而不是寫下等待/通知系統?而且,當Java為我們提供一個非常有效地管理線程生命週期的線程池時,我們可以擺脫手動創建線程。通過這些改進,消費者 - 生產者問題的解決方案看起來更加可靠和易懂。


分享到:


相關文章: