高阶并发编程Coding训练:N种优化哲学家就餐问题的方法
哲学家就餐问题是并发控制领域非常经典的一个问题。这个问题最初是由著名的计算机科学家Dijkstra提出的。1970年Dijkstra提出哲学家就餐问题是为了教学,它让学生实现一个多个机器访问多个磁带的程序,后来成为了哲学家就餐问题的雏形。
因为哲学家就餐问题刚好是一个多线程争夺多个资源的模型,写错了很容易产生死锁(deadlock)、活锁(livelock)等问题。另外,稍不留神,也可能写出性能问题;或者没有全面导致异常没有恢复。
这节课,刚好是在我们并发编程的尾声,我为大家增加了一个专门Coding的课。并且我尝试了5种方法,帮助大家从实战中了解如何解决并发问题。
哲学家就餐问题
有5个哲学家围成一桌吃饭。桌上有5份意大利面。还有5把叉子。如上图所示。假设哲学家必须拿到左右两个叉子才能够吃面。
哲学家平时都在思考,思考了一段时间后,会饥饿。哲学家饥饿了之后,就要去吃面。假设每个哲学家都用一个独立的线程实现,叉子本身是共享的访问资源,求设计一个算法,让哲学家们按部就班的思考和吃面?并且尽量减少哲学家们互相等待的时间。
初步协议的实现
所谓协议就是做事的规范。我们先定一个初步的协议:
- 每个哲学家都先拿左边的叉子,再拿右边的叉子、
- 如果拿不到,就等待
上面这样做会触发死锁。先带大家尝试一下。
抽象哲学家
哲学家用一个类Philosopher来表示,因为是一个线程,可以考虑继承于Runnable。在Philosopher类中。每个哲学家有一个id
,1-5代表5个哲学家。
我们设计了一个字符串的状态state,有3中情况:
- Thinking
- Hungry
- Eating
然后我们实现了thinking
方法,让哲学家休眠一小段时间。思考完之后,哲学家就饿了。
public void thinking() throws InterruptedException {
Thread.sleep((long) (Math.random()*100));
this.state = "Hungry";
}
然后我们实现了eating
方法,让哲学家进餐。进餐我们也让线程休眠一段时间,代表进餐的时间开销。
public void eating() throws InterruptedException {
this.state = "Eating";
Thread.sleep((long) (Math.random()*100));
this.state = "Thinking";
}
然后我们实现几个让哲学家拿起叉子、放下叉子的方法。takeLeft
和takeRight
代表哲学家拿起左右的叉子。它们需要一个整数的数组forks
代表叉子。叉子的编号是0-4,就是数组的脚标。
forks数组中的初始值是0,代表没有哲学家拿起。如果哲学家拿起了叉子i,就把forks[i]设置为哲学家的id,表示哲学及已经占有了叉子。
protected boolean takeLeft(int[] forks) {
return this._take(forks, this.left());
}
protected boolean takeRight(int[] forks){
return this._take(forks, this.right());
}
private boolean _take(int[] forks, int fork){
if(forks[fork] == 0) {
forks[fork] = this.id;
return true;
}
return false;
}
上面代码中的left
和right
方法在根据哲学家的id计算左右叉子的id。左叉子的id
刚好比id小1,右叉子的id刚好等于哲学家的id,但是考虑到哲学家id最大是5,因此右叉子id是id % 5
。
putLeft
和putRight
代表哲学家放下左右的叉子。
protected void putRight(int[] forks){
if(forks[this.right()] == this.id)
forks[this.right()] = 0;
}
protected void putLeft(int[] forks){
if(forks[this.left()] == this.id)
forks[this.left()] = 0;
}
时间统计
哲学家每次完成进餐,我这边会做一个时间统计。 利用了一个AtomicInteger。
static AtomicInteger total = new AtomicInteger(0);
每次完成进餐,各个线程都需要调用一次finish
方法打印结果,增加total
。计算平均每秒所有哲学家完成进餐的数量。这个统计时间对于我们衡量算法非常有帮助。
protected void finished(){
count++;
int t = total.incrementAndGet();
double speed = (t*1000.0) / (System.currentTimeMillis() - startMills);
this.state = "Thinking";
System.out.format("Philosopher %d finished %d times, speed = %.2f.\n", this.id, this.count, speed);
}
以下,是完整的程序:
public class Philosopher implements Runnable {
String state;
int id;
static AtomicInteger total = new AtomicInteger(0);
static long startMills = System.currentTimeMillis();
int count = 0;
public Philosopher(int id) {
this.id = id;
this.state = "Thinking";
}
public void thinking() throws InterruptedException {
if(this.state == "Thinking") {
Thread.sleep((long) (Math.random()*100));
this.state = "Hungry";
}
}
public void eating() throws InterruptedException {
this.state = "Eating";
Thread.sleep((long) (Math.random()*100));
this.state = "Thinking";
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public int left() {
return this.id - 1;
}
public int right() {
return (this.id ) % 5;
}
protected boolean takeLeft(int[] forks) {
return this._take(forks, this.left());
}
protected boolean takeRight(int[] forks){
return this._take(forks, this.right());
}
private boolean _take(int[] forks, int fork){
if(forks[fork] == 0) {
forks[fork] = this.id;
return true;
}
return false;
}
protected void putRight(int[] forks){
if(forks[this.right()] == this.id)
forks[this.right()] = 0;
}
protected void putLeft(int[] forks){
if(forks[this.left()] == this.id)
forks[this.left()] = 0;
}
public boolean checkLeft(int[] forks) {
return forks[this.left()] == 0;
}
public boolean checkRight(int[] forks) {
return forks[this.right()] == 0;
}
protected void finished(){
count++;
int t = total.incrementAndGet();
double speed = (t*1000.0) / (System.currentTimeMillis() - startMills);
this.state = "Thinking";
System.out.format("Philosopher %d finished %d times, speed = %.2f.\n", this.id, this.count, speed);
}
@Override
public void run() {
throw new UnsupportedOperationException();
}
}
初步协议的实现
假设每个哲学家线程如此工作:
while (true) {
try {
this.thinking();
this.takeLeft(forks);
this.takeRight(forks);
this.eating();
this.putLeft(forks);
this.putRight(forks);
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上面的程序没有考虑到forks
是公共资源,因此可以重写takeLeft
和takeRight
方法,将他们作为同步方法。
@Override
protected synchronized boolean takeLeft(int[] forks) {
return super.takeLeft(forks);
}
@Override
protected synchronized boolean takeRight(int[] forks) {
return super.takeRight(forks);
}
为了实现拿不到叉子就等待的逻辑,可以考虑用while循环。下面的程序,如果拿不到叉子就让线程onSpinWait
,直到拿到叉子。
while (true) {
try {
this.thinking();
while (!this.takeLeft(forks)) {
Thread.onSpinWait();
}
while (!this.takeRight(forks)) {
Thread.onSpinWait();
}
this.eating();
this.putLeft(forks);
this.putRight(forks);
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上面的程序执行起来会死锁。 如果你观察不到这个效果,可能是因为两个while
循环之间时间太短。如果你想观察明显,你可以在两个while
循环间增加100ms的sleep。
while (!this.takeLeft(forks)) {
Thread.onSpinWait();
}
Thread.sleep(100);
while (!this.takeRight(forks)) {
Thread.onSpinWait();
}
死锁的原因是所有线程同时拿起左叉子,然后所有右叉子都拿不到了。
活锁(livelock)
接下来我们增加一个机制,让拿不到右叉子的线程主动放弃左叉子。
int c = 0;
while (!this.takeRight(forks)) {
c ++;
if(c > 100) {
this.putLeft(forks);
continue;
}
Thread.onSpinWait();
}
执行起来死锁问题解决了。其实,还会有一种非常小的概率触发活锁(livelock)。这种情况发生在所有哲学家拿起左叉子,然后没有等待到右叉子,然后放下左叉子——如此循环,无穷无尽。
不过从概率上说,触发活锁的情况概率非常低。所以,在很多不完善的系统中,上面的程序也就可以作为一个解决方案出现了。只不过,一旦触发风险,也是致命的。所有线程都会变成僵尸。
下面是全部的程序:
public class DiningPhilosophersDeadlock {
Phi[] phis = new Phi[5];
volatile int[] forks = new int[5];
public DiningPhilosophersDeadlock(){
for(int i = 0; i < 5; i++) {
phis[i] = new Phi(i+1);
forks[i] = 0;
}
}
class Phi extends Philosopher {
public Phi(int id) {
super(id);
}
@Override
protected synchronized boolean takeLeft(int[] forks) {
return super.takeLeft(forks);
}
@Override
protected synchronized boolean takeRight(int[] forks) {
return super.takeRight(forks);
}
@Override
protected synchronized void putLeft(int[] forks) {
super.putLeft(forks);
}
@Override
protected synchronized void putRight(int[] forks) {
super.putRight(forks);
}
@Override
public void run() {
while (true) {
try {
this.thinking();
while (!this.takeLeft(forks)) {
Thread.onSpinWait();
}
Thread.sleep(100);
int c = 0;
while (!this.takeRight(forks)) {
c ++;
if(c > 100) {
this.putLeft(forks);
continue;
}
Thread.onSpinWait();
}
this.eating();
this.putLeft(forks);
this.putRight(forks);
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public void run(){
var pool = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++) {
pool.submit(phis[i]);
}
}
public static void main(String[] argv) {
var solver = new DiningPhilosophersDeadlock();
solver.run();
}
}
协议改进
一种简单的改进就是锁。下面程序我对整个处理过程简单粗暴上锁。这种方案可以解决问题。值得注意的时候,因为每次只有一个哲学家能指向下面全部的逻辑,因此这个哲学家肯定能够拿到叉子。因为同时只有一个哲学家在执行程序。
while(true) {
try {
lock.lockInterruptibly();
this.thinking();
this.takeLeft(forks);
this.takeRight(forks);
this.eating();
this.putLeft(forks);
this.putRight(forks);
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
上面的程序速度过于慢了,我们需要减少锁的范围。一个比较好的方案,就是只锁拿叉子的环节:
while(true) {
try {
this.thinking();
lock.lockInterruptibly();
if(!(this.checkLeft(forks) && this.checkRight(forks))) {
lock.unlock();
continue;
}
this.takeLeft(forks);
this.takeRight(forks);
lock.unlock();
this.eating();
this.putLeft(forks);
this.putRight(forks);
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上面的程序只锁了拿叉子的环节,因此takeLeft可能会失败,这样我们可以考虑在获得锁的期间对左右叉子进行检查,也就是checkLeft
和checkRight
。上面的改动将大大提高性能。
当前算法是不是 Obstruction-Free?
Eating方法如果出错(比如延迟很长的时间),那么资源(叉子)就不能被放下;其他线程就会被阻塞(无法进步)。所以线程间不是隔离的。所以这样的设计不是Obstruction Free更加不是Lock Free。
利用条件变量
上面程序会不断的循环,让拿不到叉子的线程在条件变量上阻塞是一种更好的做法。
在构造时增加:
Condition[] waitForks = new Condition[5];
// 构造函数
for(int i = 0; i < 5; i++) {
...
+waitForks[i] = lock.newCondition();
}
然后主程序修改为:
while(true) {
try {
this.thinking();
lock.lockInterruptibly();
while(!this.takeLeft(forks)) {
waitForks[this.left()].await();
}
while(!this.takeRight(forks)) {
waitForks[this.right()].await();
}
lock.unlock();
this.eating();
lock.lockInterruptibly();
this.putLeft(forks);
waitForks[this.left()].signalAll();
this.putRight(forks);
waitForks[this.right()].signalAll();
lock.unlock();
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
通信:进一步性能优化
进一步性能优化可以允许哲学家想周围的哲学家索要叉子(允许进程间通信)。因为有的哲学家缺右叉子会等在那里,新产生饥饿的哲学家可以索要这个哲学家的叉子。
while(true) {
try {
this.thinking();
lock.lockInterruptibly();
var takeLeft = this.checkLeft(forks);
if(!takeLeft) {
lock.unlock();
Thread.onSpinWait();
continue;
}
var takeRight = this.checkRight(forks);
if (takeRight) {
this.takeLeft(forks);
this.takeRight(forks);
} else{
this.takeLeft(forks);
var rid = this.right();
var rPhi = phis[forks[rid] - 1];
if (dirty[rid] && rPhi.getState() != "Eating") {
forks[rid] = this.id;
dirty[rid] = false;
} else {
lock.unlock();
continue;
}
}
lock.unlock();
this.eating();
lock.lockInterruptibly();
this.putLeft(forks);
this.putRight(forks);
dirty[this.left()] = true;
dirty[this.right()] = true;
lock.unlock();
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
具体点说,就是哲学家先拿起左边的叉子,然后尝试拿右边的叉子,会遇到两种情况:
- 如果可以拿到,哲学家拿起右叉子进餐
- 如果拿不到,就向右边的哲学家索要,如果右边的哲学家不在进餐,就可以考虑把叉子传递过去
为了防止两个哲学家之间反复传递叉子,需要一个数组记录叉子的状态。这里我用了dirty
。dirty=true
代表可以传递。一开始所有叉子都是dirty=true
,当一个哲学家转让了叉子,就把dirty
设置为false
。
下面是完整的程序:
public class DiningPhilosophersWithTransfer implements Runnable {
ReentrantLock lock = new ReentrantLock();
int forks[] = new int[5];
Condition[] waitForks = new Condition[5];
Phi[] phis = new Phi[5];
boolean[] dirty = new boolean[5];
public DiningPhilosophersWithTransfer(){
for(int i = 0; i < 5; i++) {
phis[i] = new Phi(i+1);
dirty[i] = true;
waitForks[i] = lock.newCondition();
}
}
class Phi extends Philosopher {
public Phi(int id) {
super(id);
}
@Override
public void run() {
while(true) {
try {
this.thinking();
lock.lockInterruptibly();
while(!this.takeLeft(forks)) {
waitForks[this.left()].await();
}
while(!this.takeRight(forks)) {
var rid = this.right();
var rightPhi = phis[forks[rid] -1];
if(rightPhi.getState() != "Eating" && dirty[rid] == true) {
forks[rid] = this.id;
dirty[rid] = false;
break;
}
waitForks[this.right()].await();
}
lock.unlock();
this.eating();
lock.lockInterruptibly();
this.putLeft(forks);
waitForks[this.left()].signalAll();
this.putRight(forks);
waitForks[this.right()].signalAll();
dirty[this.left()] = true;
dirty[this.right()] = true;
lock.unlock();
this.finished();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void run() {
//var pool = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; i++) {
new Thread(phis[i]).start();
}
}
public static void main(String[] argv) {
var solver = new DiningPhilosophersWithTransfer();
solver.run();
}
}
另一种解法:阻塞队列
上面介绍的几种方法,需要思考什么地方上锁,什么地方解锁。需要小心翼翼的维护状态,对写程序负担太大。因此,我们可以考虑用一种更简单、高效,且性能更好的方法,就是利用阻塞队列。
这里我构造一个控制线程,负责设置叉子的状态。然后再构造5个工作线程,用于承担哲学家thinking
和eating
的开销。
这样架构的一个主要原因是设置状态的线程本身开销很轻;工作线程开销较大。
控制线程我们称为竞争管理器:ContentionManager,代码如下:
class ContentionManager implements Runnable {
@Override
public void run() {
while(true) {
try {
var phi = managerQueue.take();
if(phi.checkLeft(forks) && phi.checkRight(forks)) {
phi.takeLeft(forks);
phi.takeRight(forks);
workingQueue.offer(phi);
} else {
managerQueue.offer(phi);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
控制线程每次从managerQueue
中取出一个哲学家,然后判断这个哲学家可以不可以拿起左右的叉子,如果可以就将哲学家放入工作线程。如果不行,就让哲学家在控制线程中重新排队。
工作线程Worker承载哲学家的工作,代码如下:
class Worker implements Runnable {
@Override
public void run() {
while(true) {
Philosopher phi = null;
try {
phi = workingQueue.take();
if(phi.getState() == "Hungry") {
phi.eating();
phi.putLeft(forks);
phi.putRight(forks);
phi.finished();
workingQueue.offer(phi);
} else {
phi.thinking();
managerQueue.offer(phi);
}
} catch (InterruptedException e) {
// 见下文讨论
}
}
}
}
工作线程遇到哲学家思考的状态,就让哲学家思考;思考结束后,将哲学家放入管理队列交给竞争管理器处理(ContentionManager)。
构造异常处理
还有一个比较有趣的问题是如果进餐有10%的失败概率,失败后哲学家会陷入一个非常长时间的休眠。这个时候,我们可以考虑实现一个延迟中断队列,最终的代码如下:
package concurrent;
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
public class DiningPhilosophersBlockingQueue implements Runnable {
Philosopher[] phis;
volatile int forks[];
LinkedBlockingQueue<Philosopher> workingQueue;
LinkedBlockingQueue<Philosopher> managerQueue;
DelayQueue<DelayInterruptingThread> delayQueue = new DelayQueue<>();
class DelayInterruptingThread implements Delayed{
long time;
Thread current;
public DelayInterruptingThread(Thread t, long delay) {
this.current = t;
this.time = System.currentTimeMillis() + delay;
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
return (int) (time - ((DelayInterruptingThread)o).time);
}
public void rollback() {
if(this.current != null) {
this.current.interrupt();
}
}
public void commit() {
this.current = null;
}
}
class Worker implements Runnable {
@Override
public void run() {
while(true) {
Philosopher phi = null;
try {
phi = workingQueue.take();
if(phi.getState() == "Hungry") {
var delayItem = new DelayInterruptingThread(Thread.currentThread(), 1000);
delayQueue.offer(delayItem);
phi.eating();
delayItem.commit();
phi.putLeft(forks);
phi.putRight(forks);
phi.finished();
workingQueue.offer(phi);
} else {
phi.thinking();
managerQueue.offer(phi);
}
} catch (InterruptedException e) {
if(phi != null) {
// Rollback Phi. State
phi.putLeft(forks);
phi.putRight(forks);
if(phi.getState() == "Eating") {
phi.setState("Hungry");
}
managerQueue.offer(phi);
}
}
}
}
}
class InterruptingWorker implements Runnable {
@Override
public void run() {
while(true) {
try {
DelayInterruptingThread delayed = (DelayInterruptingThread) delayQueue.take();
delayed.rollback();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class ContentionManager implements Runnable {
@Override
public void run() {
while(true) {
try {
var phi = managerQueue.take();
if(phi.checkLeft(forks) && phi.checkRight(forks)) {
phi.takeLeft(forks);
phi.takeRight(forks);
workingQueue.offer(phi);
} else {
managerQueue.offer(phi);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public DiningPhilosophersBlockingQueue() {
phis = new Philosopher[5];
forks = new int[5];
workingQueue = new LinkedBlockingQueue<>();
managerQueue = new LinkedBlockingQueue<>();
for(int i = 0; i < 5; i++) {
phis[i] = new Philosopher(i+1);
workingQueue.offer(phis[i]);
}
}
public void run(){
var pool = Executors.newFixedThreadPool(7);
for(int i = 0; i < 5; i++) {
pool.submit(new Worker());
}
pool.submit(new ContentionManager());
pool.submit(new InterruptingWorker());
}
public static void main(String[] argv) {
var solver = new DiningPhilosophersBlockingQueue();
solver.run();
}
}
如果感兴趣性能比较,可以看我的视频。里面有完整的line-by-line的讲解和演示。
总结
这节课我们学习了哲学家就餐问题的写法。
从最简单的上锁,到减少锁的范围,到利用条件变量节省循环等待时间,到利用哲学家之间通信提升对资源的利用效率,到利用阻塞队列减少程序负担——这些都是常规手段。
最后,我想说的是:高手,可能会跳出这些常规手段,利用算法,有可能构造出更快的方案,直接解决哲学家就餐问题。不过对于我们这个阶段,哪怕是想拿到架构师的Title,我们还是先从掌握所有常规手段入手。将来到了架构师岗位上,还能继续深造。
下一节课,我们将讨论一些并发领域的补充内容。