在多线程编程中,不同线程可能需要相互协作、共享数据,线程间通过某种方式互相告知自己的状态,以避免无效的资源争夺,这就是线程间通信。线程间通信的目的是确保多个线程能够有效地协调工作,以完成特定任务或避免数据不一致性。
线程间通信的方式可以有很多种:等待-通知、共享内存、管道流。每种方式用不同的方法来实现,这里介绍等待-通知的通信方式。“等待-通知”通信方式是Java中使用普遍的线程间通信方式,其经典的案例是“生产者-消费者”模式。
一、wait方法和notify方法
Java对象中的wait方法和notify方法是Java中实现等待通知机制最重要的两个方法,它们只能在synchronized关键字修饰的同步代码块中使用。
1、wait方法
对象的wait()方法的主要作用是让当前线程阻塞并等待被唤醒。wait()方法与对象监视器紧密相关,使用wait()方法时一定要放在同步块中。其调用形式如下
//同步代码块
synchronized(lock){
lock.wait();
......
}
对象的wait()方法的核心原理大致如下:
(1)当线程调用了lock(某个同步锁对象)的wait()方法后,JVM会将当前线程加入lock监视器的WaitSet(等待集),等待被其他线程唤醒。
(2)当前线程会释放lock对象监视器的Owner权利,让其他线程可以抢夺lock对象的监视器。
(3)让当前线程等待,其状态变成WAITING。
2、notify方法
对象的notify()方法的主要作用是唤醒在等待的线程。notify()方法与对象监视器紧密相关,调用notify()方法时也需要放在同步块中。其调用形式如下
//同步代码块
synchronized(lock){
lock.notify();
......
}
notify()方法有两个版本:
版本一:void notify()
notify()方法的主要作用为:locko.notify()调用后,唤醒locko监视器等待集中的第一条等待线程;被唤醒的线程进入EntryList,其状态从WAITING变成BLOCKED。
版本二:void notifyAll()
locko.notifyAll()被调用后,唤醒locko监视器等待集中的全部等待线程,所有被唤醒的线程进入EntryList,线程状态从WAITING变成BLOCKED。
对象的notify()或者notifyAll()方法的核心原理大致如下:
(1)当线程调用了locko(某个同步锁对象)的notify()方法后,JVM会唤醒locko监视器WaitSet中的第一条等待线程。
(2)当线程调用了locko的notifyAll()方法后,JVM会唤醒locko监视器WaitSet中的所有等待线程。
(3)等待线程被唤醒后,会从监视器的WaitSet移动到EntryList,线程具备了排队抢夺监视器Owner权利的资格,其状态从WAITING变成BLOCKED。
(4)EntryList中的线程抢夺到监视器的Owner权利之后,线程的状态从BLOCKED变成Runnable,具备重新执行的资格。
二、单生产者单消费者
接下来使用wait/notify机制模拟一个生产者生产出产品放在容器中,一个消费者从容器中取走产品的过程,示意图如下:
在这个过程中,可能会出现的需要注意的点:
- 容器只有一个,生产者生产完如果消费者没有及时把产品取走,生产者需要等待,等消费者取走产品之后才能生产下一个产品。
- 消费者消费速度比较快,有可能消费完之后生产者还没有把下一个产品生产出来,这时候消费者需要等待,等待生产者生产出来产品之后才能取走产品。
代码实现:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class Main {
/**
* 容器类
*/
@Data
static class Container {
/**
* 生产的产品编号
*/
private int num;
/**
* 当前容器的状态,false:生产中,消费者等待;true:消费中,生产者等待
*/
private boolean status;
}
/**
* 生产者类
*/
@Data
@AllArgsConstructor
static class Producer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果消费者正在消费,需要等待
if (container.status) {
container.wait();
}
//模拟生产用的时间
LockSupport.parkUntil(System.currentTimeMillis() + 1000);
container.num++;
log.info("生产完成第 {} 个产品", container.num);
//设置标志已经生产完成
container.status = true;
//唤醒消费者来消费
container.notify();
}
}
}
}
/**
* 消费者类
*/
@Data
@AllArgsConstructor
static class Consumer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果生产者正在生产,需要等待
if (!container.status) {
container.wait();
}
//模拟消费耗时
LockSupport.parkUntil(System.currentTimeMillis() + 1000);
log.info("消费完成第 {} 个产品", container.num);
//设置标志已经消费完成
container.status = false;
//通知生产者可以生产了
container.notify();
}
}
}
}
public static void main(String[] args) {
Container container = new Container();
Thread producer = new Thread(new Producer(container));
Thread consumer = new Thread(new Consumer(container));
producer.start();
consumer.start();
}
}
运行结果:
这让我想起了有些面试题中要两个线程交替打印奇偶数,实际上是换汤不换药,其实也是使用了等待通知机制,和上述的单生产者、单消费者几乎一样。实现代码如下:
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class PrintOddAndEven {
private static int i = 0;
private static final Object lock = new Object();
/**
* 打印奇数
*/
static class PrintOdd implements Runnable {
@SneakyThrows
@Override
public void run() {
while (i <= 99) {
synchronized (lock) {
//如果是偶数就等待
if (i % 2 == 0) {
lock.wait();
}
log.info("奇数【{}】", i);
i++;
lock.notify();
}
}
}
}
/**
* 打印偶数
*/
static class PrintEven implements Runnable {
@SneakyThrows
@Override
public void run() {
while (i <= 99) {
synchronized (lock) {
//如果是奇数就等待
if (i % 2 == 1) {
lock.wait();
}
log.info("偶数【{}】", i);
i++;
lock.notify();
}
}
}
}
public static void main(String[] args) {
Thread printOddThread = new Thread(new PrintOdd(), "奇数线程");
Thread printEvenThread = new Thread(new PrintEven(), "偶数线程");
printOddThread.start();
printEvenThread.start();
}
}
还是比较简单的。
话说回来,现实情况中生产者和消费者不仅仅只有一个,而且容器所能容纳的产品也不只一个,相对于单生产者、单消费者和只有一个容器的场景,多生产者、多消费者的场景要复杂的多。以“厨师-烤鸭-顾客”为例:多位厨师做完烤鸭后放在盘子里,多位顾客取走盘子里的烤鸭。
接下来基于“单生产者单消费者”代码,一步一步改造成支持多生产者、多消费者、多容器场景的代码。
三、单容器-多生产者消费者
首先看看在只有一个盘子的情况下多位做烤鸭的厨师、多位顾客的场景。
1、直接增加线程:线程安全性问题
基于之前的代码,是否直接增加生产者和消费者线程数量就可以了?来看看改造的代码
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class MultipleProducerAndConsumer {
/**
* 容器类
*/
@Data
static class Container {
/**
* 生产的产品编号
*/
private int num;
/**
* 当前容器的状态,false:生产中,消费者等待;true:消费中,生产者等待
*/
private boolean status;
}
/**
* 生产者类
*/
@Data
@AllArgsConstructor
static class Producer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果消费者正在消费,需要等待
if (container.status) {
container.wait();
}
//模拟生产用的时间
LockSupport.parkUntil(System.currentTimeMillis() + 1000);
container.num++;
log.info("生产完成第 {} 个产品", container.num);
//设置标志已经生产完成
container.status = true;
//唤醒消费者来消费
container.notify();
}
}
}
}
/**
* 消费者类
*/
@Data
@AllArgsConstructor
static class Consumer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果生产者正在生产,需要等待
if (!container.status) {
container.wait();
}
//模拟消费耗时
LockSupport.parkUntil(System.currentTimeMillis() + 1000);
log.info("消费完成第 {} 个产品", container.num);
//设置标志已经消费完成
container.status = false;
//通知生产者可以生产了
container.notify();
}
}
}
}
public static void main(String[] args) {
Container container = new Container();
Thread producer1 = new Thread(new Producer(container));
Thread producer2 = new Thread(new Producer(container));
Thread producer3 = new Thread(new Producer(container));
Thread consumer1 = new Thread(new Consumer(container));
Thread consumer2 = new Thread(new Consumer(container));
Thread consumer3 = new Thread(new Consumer(container));
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
运行结果:
一开始运行的好好的,结果生产到第4个烤鸭的时候,出现了烤鸭生产出来但是无人消费的情况,直接干到第5个烤鸭了,这很明显出现了线程安全性问题。除了多生产了无人消费,还出现了重复消费的问题
为什么会出现线程安全性问题呢?
先说下结论,问题在于同步代码块中的第一块代码
//如果消费者正在消费,需要等待
if (container.status) {
container.wait();
}
以及
//如果生产者正在生产,需要等待
if (!container.status) {
container.wait();
}
首先得明白,由于这些线程共用了同一把锁,所以能进入同步代码块的线程同一时间只有一个,但是仅仅这样并不能保证线程安全。基于上面的代码我们来看看如下场景:
假设线程1/2/3是生产者线程,4/5/6是消费者线程。
① 线程1先生产了一个烤鸭,然后将状态置为待消费状态,下次循环有获取到了锁,但是已经有烤鸭了,所以它进入了等待队列,线程变成了等待状态;
② 线程2/3依次获取到了锁,它们发现烤鸭已经做好了,所以它们全部进入等待队列,变成等待状态。此时消费者线程4/5/6均处于阻塞状态还未获取过锁。
③ 线程4获取到了锁,第一次消费了烤鸭,并唤醒了等待队列中的1/2/3线程中的线程1,自己再次获取到锁并进入等待状态。此时等待队列中有生产者线程2/3以及消费者线程4
④ 被唤醒的线程1重新获取到锁以后开始生产烤鸭,生产完成之后随机唤醒等待队列中的一条线程,这时候就要出现问题了:如果唤醒了消费者线程4则没啥问题,但是如果唤醒了生产者线程2/3就麻烦了,等待队列中的线程一旦被唤醒,重新获取到锁以后就会在之前wait方法调用的地方继续运行
//如果消费者正在消费,需要等待
if (container.status) {
container.wait();
}
被唤醒的生产者线程并没有判断状态,直接去生产烤鸭去了,明明上一个生产的烤鸭还没消费,就又生产了一个烤鸭,这就造成了“多生产,未消费”的现象;同理,如果等待队列中有多个消费者线程,消费者线程消费完烤鸭之后又唤醒了等待队列中的其它消费者线程,则同一个烤鸭又会被重复消费,造成“重复消费同一只烤鸭”的现象。
2、if改while:死锁问题
上一步经过分析,已经找到问题了,那就好解决了
//如果消费者正在消费,需要等待
if (container.status) {
container.wait();
}
把if判断改成while循环判断:
//如果消费者正在消费,需要等待
while (container.status) {
container.wait();
}
这样线程被唤醒之后需要继续判断状态才能进行下一步操作。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class MultipleProducerAndConsumer {
/**
* 容器类
*/
@Data
static class Container {
/**
* 生产的产品编号
*/
private int num;
/**
* 当前容器的状态,false:生产中,消费者等待;true:消费中,生产者等待
*/
private boolean status;
}
/**
* 生产者类
*/
@Data
@AllArgsConstructor
static class Producer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果消费者正在消费,需要等待
while (container.status) {
container.wait();
}
//模拟生产用的时间
LockSupport.parkUntil(System.currentTimeMillis() + 200);
container.num++;
log.info("生产完成第 {} 个产品", container.num);
//设置标志已经生产完成
container.status = true;
//唤醒消费者来消费
container.notify();
}
}
}
}
/**
* 消费者类
*/
@Data
@AllArgsConstructor
static class Consumer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果生产者正在生产,需要等待
while (!container.status) {
container.wait();
}
//模拟消费耗时
LockSupport.parkUntil(System.currentTimeMillis() + 200);
log.info("消费完成第 {} 个产品", container.num);
//设置标志已经消费完成
container.status = false;
//通知生产者可以生产了
container.notify();
}
}
}
}
public static void main(String[] args) {
Container container = new Container();
Thread producer1 = new Thread(new Producer(container));
Thread producer2 = new Thread(new Producer(container));
Thread producer3 = new Thread(new Producer(container));
Thread consumer1 = new Thread(new Consumer(container));
Thread consumer2 = new Thread(new Consumer(container));
Thread consumer3 = new Thread(new Consumer(container));
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
运行代码结果如下
通过jstack命令,能够看到6个线程都处于WAITING状态,没锁,产生死锁了。哎,一波刚平,一波又起,这次又是什么原因呢?
先说结论,这次是因为唤醒语句lock.notify
还是假设线程1/2/3是生产者线程,4/5/6是消费者线程。
①线程4/5/6依次获取到了锁,但是由于烤鸭还未生产,所以它们依次进入到了等待队列,线程变成了等待状态。
②线程1先生产了一个烤鸭,然后将状态置为待消费状态,并唤醒了消费线程4;下次循环有获取到了锁,但是已经有烤鸭了,所以它进入了等待队列,线程变成了等待状态;
③线程2/3依次获取到了锁,它们发现烤鸭已经做好了,所以它们全部进入等待队列,变成等待状态。此时消费者线程4/5/6均处于阻塞状态还未获取过锁。此时除了线程4处于阻塞状态,其余线程均处于等待状态。
④线程4重新被唤醒后获取到了锁,第一次消费了烤鸭,之后它要唤醒等待队列中的1/2/3/5/6线程中的其中一条线程。如果能唤醒1/2/3生产者线程中的其中一条还没问题,如果它唤醒了5/6线程的话,就麻烦了:如果线程4唤醒了消费者线程5,线程5发现状态是待生产状态,线程5就会进入等待队列;线程4由于状态未被更改,下次循环获取到了锁,判断状态还是待生产状态,它也会进入等待队列。这样六条线程就都进入等待队列了。死锁就形成了。
3、终版代码
上一步分析了出现死锁的原因,其实就是notify方法唤醒错了线程,解决方法也很粗暴:使用notifyAll方法唤醒所有线程。虽然有部分线程是没有必要唤醒的,但是这样能解决死锁问题。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.LockSupport;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class MultipleProducerAndConsumer {
/**
* 容器类
*/
@Data
static class Container {
/**
* 生产的产品编号
*/
private int num;
/**
* 当前容器的状态,false:生产中,消费者等待;true:消费中,生产者等待
*/
private boolean status;
}
/**
* 生产者类
*/
@Data
@AllArgsConstructor
static class Producer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果消费者正在消费,需要等待
while (container.status) {
container.wait();
}
//模拟生产用的时间
LockSupport.parkUntil(System.currentTimeMillis() + 200);
container.num++;
log.info("生产完成第 {} 个产品", container.num);
//设置标志已经生产完成
container.status = true;
//唤醒消费者来消费
container.notifyAll();
}
}
}
}
/**
* 消费者类
*/
@Data
@AllArgsConstructor
static class Consumer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
synchronized (container) {
//如果生产者正在生产,需要等待
while (!container.status) {
container.wait();
}
//模拟消费耗时
LockSupport.parkUntil(System.currentTimeMillis() + 200);
log.info("消费完成第 {} 个产品", container.num);
//设置标志已经消费完成
container.status = false;
//通知生产者可以生产了
container.notifyAll();
}
}
}
}
public static void main(String[] args) {
Container container = new Container();
Thread producer1 = new Thread(new Producer(container));
Thread producer2 = new Thread(new Producer(container));
Thread producer3 = new Thread(new Producer(container));
Thread consumer1 = new Thread(new Consumer(container));
Thread consumer2 = new Thread(new Consumer(container));
Thread consumer3 = new Thread(new Consumer(container));
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
运行结果
这样问题就解决了。
四、显式锁和Condition方案
上一节使用内部锁synchronized实现了单容器下多生产者、消费者,它遗留了一个问题:生产者生产完成之后,消费者消费完之后都要调用notifyAll方法唤醒所有等待队列中的线程,实际上生产者生产完成之后只需要唤醒消费者线程,消费者消费完成之后只需要唤醒生产者线程,然而传统的synchronized内部锁并不支持这种单独唤醒某一组线程的功能。使用显式锁+Condition的方案可以解决内部锁等待唤醒机制中的notifyAll的缺陷,接下来将使用它逐步替换掉单容器下的多生产者、消费者中的内部锁实现。
1、显式锁和Condition
显式锁之前说过了,它需要显式调用lock和unlock方法实现加锁和解锁,获取方式就是Lock lock=new ReentrantLock();
,需要注意的是为了能够保证unlock方法的执行,必须将unlock方法放在finally块中执行。
Condition相当于原来synchronized内部锁的“监视器”,原来synchronized关键字的监视器是内置的ObjectMonitor,一把锁一个监视器,而在显式锁中则将监视器分离出来显式处理了,这样就允许一个显式锁上有多个监视器。Condition接口封装了三个重要的方法:await() 、signal() 、signalAll(),这三个方法对应着Object类中的wait() 、notify() 、notifyAll()方法。
为什么要使用Condition接口?
由于显式锁Lock接口的出现,synchronized关键字出现了可替代方案,但显式锁上并没有wait() 、notify() 、notifyAll()这三个重要的方法,这是因为显式锁中一把锁上可以有多个监视器,如果这三个方法称为Lock接口中的成员,将会使得一把锁上只能有一个监视器,和内部锁相比就没有了优势。使用Condition和Lock分离的方式可以分离锁和监视器,使得不通的线程可以归属不同的监视器。**换句话说,Condition对竞争同一把锁的线程进行了“分组”,在唤醒线程的时候可以根据分组局部唤醒线程,这样就解决了原来内部锁notifyAll的缺陷。**使用Condition接口的最大好处就是可以指定唤醒的线程是对方的上线程还是己方的线程,这样就能大大的提高工作效率。
Condition对象的获取需要使用显式锁对象:Condition con=lock.newCondition();
2、notifyAll平替方案
首先使用显式锁+Condition的方案重写上一章节“单容器-多生产者消费者”中的最终代码,这里还是唤醒全部线程。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class LockMultipleProducerAndConsumer {
static final Lock LOCK = new ReentrantLock();
static final Condition CONDITION = LOCK.newCondition();
/**
* 容器类
*/
@Data
static class Container {
/**
* 生产的产品编号
*/
private int num;
/**
* 当前容器的状态,false:生产中,消费者等待;true:消费中,生产者等待
*/
private boolean status;
}
/**
* 生产者类
*/
@Data
@AllArgsConstructor
static class Producer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
LOCK.lock();
try {
//如果消费者正在消费,需要等待
while (container.status) {
CONDITION.await();
}
//模拟生产用的时间
LockSupport.parkUntil(System.currentTimeMillis() + 200);
container.num++;
log.info("生产完成第 {} 个产品", container.num);
//设置标志已经生产完成
container.status = true;
//唤醒消费者来消费
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
}
/**
* 消费者类
*/
@Data
@AllArgsConstructor
static class Consumer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
LOCK.lock();
try {
//如果生产者正在生产,需要等待
while (!container.status) {
CONDITION.await();
}
//模拟消费耗时
LockSupport.parkUntil(System.currentTimeMillis() + 200);
log.info("消费完成第 {} 个产品", container.num);
//设置标志已经消费完成
container.status = false;
//通知生产者可以生产了
CONDITION.signalAll();
} finally {
LOCK.unlock();
}
}
}
}
public static void main(String[] args) {
Container container = new Container();
Thread producer1 = new Thread(new Producer(container));
Thread producer2 = new Thread(new Producer(container));
Thread producer3 = new Thread(new Producer(container));
Thread consumer1 = new Thread(new Consumer(container));
Thread consumer2 = new Thread(new Consumer(container));
Thread consumer3 = new Thread(new Consumer(container));
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
其运行结果和以前相同。这样虽然使用显式锁重写了,但是还是唤醒了全部的线程,其运行效率和原先相比并没有更高。
3、使用Condition局部唤醒线程
在单容器-多生产者消费者的案例中,生产者生产完成以后其实只需要通知一个等待中的消费者即可,同理,消费者消费完成以后也只需要通知一个等待中的生产者,这是最合理效率最高的方式,接下来使用两个Condition来解决该问题。
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class LockMultipleProducerAndConsumerUseMultipleCondition {
/**
* 显式锁
*/
static final Lock LOCK = new ReentrantLock();
/**
* 用于监视生产者线程的监视器
*/
static final Condition PRODUCER_CONDITION = LOCK.newCondition();
/**
* 用于监视消费者线程的监视器
*/
static final Condition CONSUMER_CONDITION = LOCK.newCondition();
/**
* 容器类
*/
@Data
static class Container {
/**
* 生产的产品编号
*/
private int num;
/**
* 当前容器的状态,false:生产中,消费者等待;true:消费中,生产者等待
*/
private boolean status;
}
/**
* 生产者类
*/
@Data
@AllArgsConstructor
static class Producer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
LOCK.lock();
try {
//如果消费者正在消费,需要等待
while (container.status) {
PRODUCER_CONDITION.await();
}
//模拟生产用的时间
LockSupport.parkUntil(System.currentTimeMillis() + 200);
container.num++;
log.info("生产完成第 {} 个产品", container.num);
//设置标志已经生产完成
container.status = true;
//唤醒一个消费者来消费
CONSUMER_CONDITION.signal();
} finally {
LOCK.unlock();
}
}
}
}
/**
* 消费者类
*/
@Data
@AllArgsConstructor
static class Consumer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
LOCK.lock();
try {
//如果生产者正在生产,需要等待
while (!container.status) {
CONSUMER_CONDITION.await();
}
//模拟消费耗时
LockSupport.parkUntil(System.currentTimeMillis() + 200);
log.info("消费完成第 {} 个产品", container.num);
//设置标志已经消费完成
container.status = false;
//唤醒一个生产者去生产了
PRODUCER_CONDITION.signal();
} finally {
LOCK.unlock();
}
}
}
}
public static void main(String[] args) {
Container container = new Container();
Thread producer1 = new Thread(new Producer(container));
Thread producer2 = new Thread(new Producer(container));
Thread producer3 = new Thread(new Producer(container));
Thread consumer1 = new Thread(new Consumer(container));
Thread consumer2 = new Thread(new Consumer(container));
Thread consumer3 = new Thread(new Consumer(container));
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
注意这里使用了两个Condition用于分别监视生产者线程组和消费者线程组,唤醒线程的时候只在对应的分组中唤醒一条,这样避免了notifyAll唤醒全部线程的缺陷。
五、多容器-多生产者多消费者
之前讨论的多生产者多消费者问题所用的容器类中只能存放1个烤鸭,多个生产者争着生产这一个烤鸭,多个消费者争着消费这一个烤鸭,这在实际生活中是不存在的。应当改造成这样:有一个可以存放多个烤鸭的容器,生产者将烤鸭生产完毕之后放入容器,消费者在后面消费,当容器满了,生产者停下等待消费者消费,消费者一旦消费掉一个烤鸭,就告诉生产者容器内可以存放新烤鸭了,生产者于是开始生产新的烤鸭并放在空位上;消费者发现容器空了,则等待生产者生产出烤鸭,生产者一旦生产出烤鸭就立即通知消费者容器内已经有烤鸭了,可以消费了。这才是一个生产者消费者问题的实际流程。
让我们分析下这个过程如何实现:
- 要能存放多个烤鸭,可以使用固定长度的数组来存放
- 生产者等待的时机:数组满了代表容器满了没有办法再继续制作烤鸭,生产者应该进入等待队列等待消费者消费
- 消费者等待的时机:数组为空代表已经没有烤鸭可以消费了,消费者应该进入等待队列等待生产者生产
- 因为数组长度是固定的,所以需要使用一个变量来记录真实的烤鸭数量,以判定谁需要去等待队列等待。
- 生产者和消费者假设都从头开始生产和消费,它们分别需要一个指针用来记录当前生产和消费的位置,当指针到达队列尾部,需要从头开始生产或者消费。
基于上面的分析,使用显式锁实现多容器-多生产者消费者的案例
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kdyzm
* @date 2024/11/6
*/
@Slf4j
public class LockMultipleProducerAndConsumerUseMultipleContainer {
/**
* 容器类
*/
@Data
static class Container {
/**
* 显式锁
*/
private final Lock lock = new ReentrantLock();
/**
* 用于通知生产者线程的监视器
*/
private final Condition notFull = lock.newCondition();
/**
* 用于通知消费者线程的监视器
*/
private final Condition notEmpty = lock.newCondition();
/**
* 可以存放5个烤鸭的容器
*/
private Object[] items = new Object[5];
/**
* 容器中烤鸭的真实数量
*/
private int count;
/**
* 生产者存放烤鸭的指针
*/
private int putptr;
/**
* 消费者消费烤鸭的指针
*/
private int takeptr;
@SneakyThrows
public void put() {
lock.lock();
try {
//如果队列已满,生产者等待
while (count == items.length) {
notFull.await();
}
//模拟生产用的时间
Object obj = new Object();
items[putptr] = obj;
//如果到达队尾,从头开始生产
if (++putptr == items.length) {
putptr = 0;
}
count++;
log.info("生产完成第 {} 个产品,产品总数量:{}", putptr, count);
//唤醒一个消费者来消费
notEmpty.signal();
} finally {
lock.unlock();
}
}
@SneakyThrows
public Object take() {
lock.lock();
try {
//如果队列为空,消费者等待
while (count == 0) {
notEmpty.await();
}
//模拟消费耗时
Object obj = items[takeptr];
//到达队尾,从头开始消费
if (++takeptr == items.length) {
takeptr = 0;
}
count--;
log.info("消费完成第 {} 个产品,产品总数量:{}", takeptr, count);
//通知一个生产者可以生产了
notFull.signal();
return obj;
} finally {
lock.unlock();
}
}
}
/**
* 生产者类
*/
@Data
@AllArgsConstructor
static class Producer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
Thread.sleep(500);
container.put();
}
}
}
/**
* 消费者类
*/
@Data
@AllArgsConstructor
static class Consumer implements Runnable {
private final Container container;
@SneakyThrows
@Override
public void run() {
while (true) {
Thread.sleep(500);
container.take();
}
}
}
public static void main(String[] args) {
Container container = new Container();
Thread producer1 = new Thread(new Producer(container), "生产者1");
Thread producer2 = new Thread(new Producer(container), "生产者2");
Thread producer3 = new Thread(new Producer(container), "生产者3");
Thread consumer1 = new Thread(new Consumer(container), "消费者1");
Thread consumer2 = new Thread(new Consumer(container), "消费者2");
Thread consumer3 = new Thread(new Consumer(container), "消费者3");
producer1.start();
producer2.start();
producer3.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}
可以调整生产者、消费者线程的数量观察供大于求、供小于求的运行结果;或者调整生产者线程和消费者线程sleep的时间测试同样的线程数量的情况下不同的工作效率对结果的影响。
最后,值得一提的是,Condition类的JDK源代码中在类的注释上给出了Condition类的使用方法
没错,上面的案例正是根据源码中的注释写的。
END.
注意:本文归作者所有,未经作者允许,不得转载