JAVA中生产者-消费者问题的演变

JAVA中生产者-消费者问题的演变

生产者 - 消费者问题是多进程同步问题的典型示例。对于我们大多数人来说,这个问题可能是我们在学校学习并首次面对并行算法的第一个同步问题。它很简单,它恢复了并行计算中的最大挑战 - 通过多个进程共享单个资源。

问题陈述

有两个进程,生产者和使用者,共享一个有限大小的公共缓冲区。生产者“生成”数据并将其存储在缓冲区中,消费者“消耗”数据,将其从缓冲区中删除。有两个并行运行的进程,我们需要确保生成器在缓冲区已满时不会将新数据放入缓冲区,如果缓冲区为空,消费者不会尝试从缓冲区中删除数据。

解决方案

为了解决这个并发问题,生产者和消费者必须相互通信。如果缓冲区已满,则生产者将进入休眠状态并等待通知。消费者将从缓冲区中删除一些数据后,它将通知生产者,然后,生产者将再次开始重新填充缓冲区。如果缓冲区为空,则会发生相同的过程,但在这种情况下,消费者将等待生产者通知。

如果此通信未正确完成,则可能导致死锁,其中两个进程将彼此等待。

经典方法

让我们看看这个问题在Java中是怎么解决的:

package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ClassicProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread consumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
buffer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void produce() throws InterruptedException {
int value = 0;
while (true) {
synchronized (this) {
while (list.size() >= size) {

//等待消费者
wait();
}
list.add(value);
System.out.println("Produced " + value);
value++;
// 通知消费者
notify();
Thread.sleep(1000);
}
}
}
public void consume() throws InterruptedException {
while (true) {
synchronized (this) {
while (list.size() == 0) {
// 等待生产者r
wait();
}
int value = list.poll();
System.out.println("Consume " + value);
// 通知生产者
notify();
Thread.sleep(1000);
}
}
}
}
}

/<integer>

这里我们有两个线程,一个生产者和一个消费者线程,它们共享一个公共缓冲区。生产者线程开始生成新元素并将它们存储在缓冲区中。如果缓冲区已满,它将进入休眠状态并等待通知。否则,它会在缓冲区中放入一个新元素并通知消费者。就像我之前说的那样,同样的过程适用于消费者。如果缓冲区为空,则消费者将等待生产者通知。否则,它将从中删除一个元素, 并将通知消费者。

如您所见,在前面的示例中,两个作业都由buffer 对象管理 。该线程只是打电话 , 一切是由这两种方法来完成,buffer.produce() 和 buffer.consume()。

这是一个值得商榷的主题,但在我看来,缓冲区不应该负责创建或删除元素。当然,这取决于您想要实现的目标,但在这种情况下,缓冲区应该负责以线程安全的方式存储和汇集元素,而不是生成元素。

所以我们将生产品和消耗逻辑移出buffer对象:

package ProducerConsumer;
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerExample2 {
public static void main(String[] args) throws InterruptedException {
Buffer buffer = new Buffer(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
buffer.add(value);
System.out.println("Produced " + value);
value ++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = buffer.poll();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();

}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
static class Buffer {
private Queue<integer> list;
private int size;
public Buffer(int size) {
this.list = new LinkedList<>();
this.size = size;
}
public void add(int value) throws InterruptedException {
synchronized (this) {
while (list.size() >= size) {
wait();
}
list.add(value);
notify();
}
}
public int poll() throws InterruptedException {
synchronized (this) {
while (list.size() == 0) {
wait();
}
int value = list.poll();
notify();
return value;
}
}
}
}
/<integer>

现在,缓冲区负责以线程安全的方式存储和删除元素。

阻塞队列

我们可以进一步改进这一点。在前面的示例中,我们创建了一个缓冲区,在存储元素时,等待一个插槽在没有更多空间的情况下可用,并且在池化时,如果缓冲区为空,它等待一个元素变得可用,使存储和删除操作成为线程安全的。

但是,Java已经有了这个集合。这就是所谓的一 BlockingQueue ,这是一个队列是线程安全的。它完全符合我们的要求。因此,如果我们BlockingQueue 在示例中使用 ,则不必实现等待和通知机制。

让我们看看它的代码示例:

package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerWithBlockingQueue {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<integer> blockingQueue = new LinkedBlockingDeque<>(2);
Thread producerThread = new Thread(() -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread consumerThread = new Thread(() -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
/<integer>

与之前完全一样。它们以相同的方式生产和消费元素。唯一的区别是,我们在这里使用的是一个 blockingQueue 而不是我们的 buffer 对象。

有关阻塞队列的一些详细信息

BlockingQueue有两种类型

  • 有界队列
  • 无界队列

无界队列几乎可以无限增长,并且添加操作不会阻塞。我们可以像这样创建一个无界队列:

BlockingQueue < String > blockingQueue = new LinkedBlockingDeque <>();

在这种情况下,由于添加操作没有阻塞,因此生产者不必在添加新元素时等待。每次生产者想要添加新元素时,队列都会存储它。但是,这里有一个问题。如果消费者没有比生产者添加新元素更快地删除元素,那么内存将填满并且我们将获得 OutOfMemory 异常。

相反,有界队列具有固定大小。我们可以创建一个这样的:

BlockingQueue < String > blockingQueue = new LinkedBlockingDeque <>(10);

主要区别在于使用有界队列,如果队列已满并且生产者尝试存储另一个元素,则根据用于添加的方法,队列将阻塞,直到它有足够的空间。

在阻塞队列中添加元素有四种方法:

  • add() - 如果插入成功则返回true,否则返回trueIllegalStateException
  • put() - 在队列中插入一个元素,并在必要时等待一个空闲插槽
  • offer() -如果插入成功返回true,否则,返回 false
  • offer(E e,long timeout,TimeUnit unit) - 如果元素未满,则将元素插入队列,或等待指定超时内的可用插槽

因此,如果我们使用该put() 方法并且队列已满,则生产者必须等到有空闲插槽。这就是我们在前面的示例中使用的内容。

使用线程池

我们还能在这方面怎么改进呢?让我们来分析一下我们做了什么。我们实例化了两个线程,一个将一些元素放入阻塞队列,生成器,另一个从队列(消费者)则检索元素。

但是,良好的软件技术表明手动创建和销毁线程是不好的做法。线程创建是一项昂贵的任务。每个线程创建意味着以下步骤:

  • 它为线程堆栈分配内存
  • 操作系统创建与Java线程对应的本机线程
  • 与线程相关的描述符被添加到JVM内部数据结构中

别误会我的意思。拥有更多线程没有任何问题。这就是并行性的工作原理。这里的问题是我们“手动”创建它们。这是不好的做法。如果我们手动创建它们,除了创建成本之外,另一个问题是我们无法控制它们中有多少是同时运行的,例如,如果有数百万个请求到达服务器应用程序,并且每个请求都会创建一个新线程,那么数百万个线程将并行运行,这可能导致线程不足

因此,我们需要一种战略性管理线程的方法。这里是线程池。

线程池根据选定的策略处理线程的生命周期。它拥有有限数量的空闲线程,并在需要解决任务时重用它们。这样,我们不必每次都为新请求创建一个新线程,因此,我们可以避免线程饥饿。

Java线程池实现包括:

  • 任务队列
  • 工作线程的集合
  • 一个线程工厂
  • 用于管理线程池状态的元数据。

要同时运行某些任务,必须将它们放在任务队列中。然后,当一个线程可用时,它将接收一个任务并运行它。可用线程越多,并行执行的任务就越多。

除了线程生命周期管理之外,使用线程池时的另一个优点是,当我们计划如何拆分要同时执行的工作时,就可以以更实用的方式进行思考。并行的单位不再是线程; 这是任务。我们设计了一些并发执行的任务,而不是一些共享公共内存并且并行运行的线程。以功能方式思考可以帮助我们避免一些常见的多线程问题,例如死锁或数据争用。没有什么能阻止我们再次陷入这些问题,但是,因为使用功能范例的原因,我们不会强制性地同步并发计算(使用锁)。这比直接使用线程和共享内存要少得多。在我们的示例中不是这种情况,因为任务共享一个阻塞队列。

有了所有这些基础,让我们看看我们的示例如何使用线程池。

package ProducerConsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class ProducerConsumerExecutorService {
public static void main(String[] args) {
BlockingQueue<integer> blockingQueue = new LinkedBlockingDeque<>(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable producerTask = () -> {
try {
int value = 0;
while (true) {
blockingQueue.put(value);
System.out.println("Produced " + value);
value++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable consumerTask = () -> {
try {
while (true) {
int value = blockingQueue.take();
System.out.println("Consume " + value);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
};
executor.execute(producerTask);
executor.execute(consumerTask);
executor.shutdown();
}
}
/<integer>

这里的不同之处在于,我们不是手动创建和运行消费者和生产者线程,而是构建一个线程池,它将接收两个任务,即生产者和消费者任务。生产者和消费者任务实际上与前一个示例中使用的runnable相同。现在,执行程序(线程池实现)接收任务,其工作线程将执行它们。

在我们简单的例子中,一切都将像以前一样工作。就像前面的例子一样,我们仍然有两个线程,它们仍然以相同的方式生成和使用元素。因此,我们在这里没有性能改进,但代码看起来更干净。我们不再手动构建线程,而是指定我们想要的内容。并且,我们想要一种并行执行某些任务的方法。

因此,当我们使用线程池时,就不必将线程视为并行的单位,而是考虑一些并发执行的任务。这就是我们需要知道的,执行者将处理剩下的事情。它将接收一些任务,然后,它将使用可用的工作线程执行它们。

结语

首先,我们看到了消费者 - 生产者问题的“传统”解决方案。我们试图在没有必要的情况下不重新发明轮子,而是重新使用已经测试过的解决方案。那么,为什么不使用已经提供的Java阻塞队列,而不是写下等待/通知系统?而且,当Java为我们提供一个非常有效地管理线程生命周期的线程池时,我们可以摆脱手动创建线程。通过这些改进,消费者 - 生产者问题的解决方案看起来更加可靠和易懂。


分享到:


相關文章: