Skip to content

高阶并发编程Coding训练:N种优化哲学家就餐问题的方法

哲学家就餐问题是并发控制领域非常经典的一个问题。这个问题最初是由著名的计算机科学家Dijkstra提出的。1970年Dijkstra提出哲学家就餐问题是为了教学,它让学生实现一个多个机器访问多个磁带的程序,后来成为了哲学家就餐问题的雏形。

因为哲学家就餐问题刚好是一个多线程争夺多个资源的模型,写错了很容易产生死锁(deadlock)、活锁(livelock)等问题。另外,稍不留神,也可能写出性能问题;或者没有全面导致异常没有恢复。

这节课,刚好是在我们并发编程的尾声,我为大家增加了一个专门Coding的课。并且我尝试了5种方法,帮助大家从实战中了解如何解决并发问题。

哲学家就餐问题

image-20210103224453756

有5个哲学家围成一桌吃饭。桌上有5份意大利面。还有5把叉子。如上图所示。假设哲学家必须拿到左右两个叉子才能够吃面。

哲学家平时都在思考,思考了一段时间后,会饥饿。哲学家饥饿了之后,就要去吃面。假设每个哲学家都用一个独立的线程实现,叉子本身是共享的访问资源,求设计一个算法,让哲学家们按部就班的思考和吃面?并且尽量减少哲学家们互相等待的时间。

初步协议的实现

所谓协议就是做事的规范。我们先定一个初步的协议:

  1. 每个哲学家都先拿左边的叉子,再拿右边的叉子、
  2. 如果拿不到,就等待

上面这样做会触发死锁。先带大家尝试一下。

抽象哲学家

哲学家用一个类Philosopher来表示,因为是一个线程,可以考虑继承于Runnable。在Philosopher类中。每个哲学家有一个id ,1-5代表5个哲学家。

我们设计了一个字符串的状态state,有3中情况:

  • Thinking
  • Hungry
  • Eating

然后我们实现了thinking 方法,让哲学家休眠一小段时间。思考完之后,哲学家就饿了。

public void thinking() throws InterruptedException {
    Thread.sleep((long) (Math.random()*100));
    this.state = "Hungry";
}

然后我们实现了eating 方法,让哲学家进餐。进餐我们也让线程休眠一段时间,代表进餐的时间开销。

public void eating() throws InterruptedException {
    this.state = "Eating";  
    Thread.sleep((long) (Math.random()*100));
    this.state = "Thinking";
}

然后我们实现几个让哲学家拿起叉子、放下叉子的方法。takeLefttakeRight 代表哲学家拿起左右的叉子。它们需要一个整数的数组forks 代表叉子。叉子的编号是0-4,就是数组的脚标。

forks数组中的初始值是0,代表没有哲学家拿起。如果哲学家拿起了叉子i,就把forks[i]设置为哲学家的id,表示哲学及已经占有了叉子。

protected boolean takeLeft(int[] forks) {
    return this._take(forks, this.left());
}

protected boolean takeRight(int[] forks){
    return this._take(forks, this.right());
}

private boolean _take(int[] forks, int fork){
    if(forks[fork] == 0) {
        forks[fork] = this.id;
        return true;
    }
    return false;

}

上面代码中的leftright 方法在根据哲学家的id计算左右叉子的id。左叉子的id 刚好比id小1,右叉子的id刚好等于哲学家的id,但是考虑到哲学家id最大是5,因此右叉子id是id % 5

putLeftputRight 代表哲学家放下左右的叉子。

protected void putRight(int[] forks){
    if(forks[this.right()] == this.id)
        forks[this.right()] = 0;
}

protected void putLeft(int[] forks){
    if(forks[this.left()] == this.id)
        forks[this.left()] = 0;
}

时间统计

哲学家每次完成进餐,我这边会做一个时间统计。 利用了一个AtomicInteger。

 static AtomicInteger total = new AtomicInteger(0);

每次完成进餐,各个线程都需要调用一次finish 方法打印结果,增加total 。计算平均每秒所有哲学家完成进餐的数量。这个统计时间对于我们衡量算法非常有帮助。

protected void finished(){
    count++;
    int t = total.incrementAndGet();
    double speed = (t*1000.0) / (System.currentTimeMillis() - startMills);
    this.state = "Thinking";
    System.out.format("Philosopher %d finished %d times, speed = %.2f.\n", this.id, this.count, speed);
}

以下,是完整的程序:

public class Philosopher implements Runnable {

    String state;
    int id;
    static AtomicInteger total = new AtomicInteger(0);
    static long startMills = System.currentTimeMillis();
    int count = 0;

    public Philosopher(int id) {
        this.id = id;
        this.state = "Thinking";
    }

    public void thinking() throws InterruptedException {
        if(this.state == "Thinking") {            
            Thread.sleep((long) (Math.random()*100));
            this.state = "Hungry";
        }
    }


    public void eating() throws InterruptedException {
        this.state = "Eating";  
        Thread.sleep((long) (Math.random()*100));
        this.state = "Thinking";
    }

    public String getState() {
        return state;
    }

    public void setState(String state) {
        this.state = state;
    }

    public int left() {
        return this.id - 1;
    }

    public int right() {
        return (this.id ) % 5;
    }

    protected boolean takeLeft(int[] forks) {
        return this._take(forks, this.left());
    }

    protected boolean takeRight(int[] forks){
        return this._take(forks, this.right());
    }

    private boolean _take(int[] forks, int fork){
       if(forks[fork] == 0) {
           forks[fork] = this.id;
           return true;
       }
       return false;

    }

    protected void putRight(int[] forks){
        if(forks[this.right()] == this.id)
        forks[this.right()] = 0;
    }

    protected void putLeft(int[] forks){
        if(forks[this.left()] == this.id)
            forks[this.left()] = 0;
    }

    public boolean checkLeft(int[] forks) {
        return forks[this.left()] == 0;
    }

    public boolean checkRight(int[] forks) {
        return forks[this.right()] == 0;
    }

    protected void finished(){
        count++;
        int t = total.incrementAndGet();
        double speed = (t*1000.0) / (System.currentTimeMillis() - startMills);
        this.state = "Thinking";
        System.out.format("Philosopher %d finished %d times, speed = %.2f.\n", this.id, this.count, speed);
    }

    @Override
    public void run() {
        throw new UnsupportedOperationException();
    }
}

初步协议的实现

假设每个哲学家线程如此工作:

while (true) {
    try {
        this.thinking();
        this.takeLeft(forks);
        this.takeRight(forks);
        this.eating();
        this.putLeft(forks);
        this.putRight(forks);
        this.finished();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

上面的程序没有考虑到forks 是公共资源,因此可以重写takeLefttakeRight 方法,将他们作为同步方法。

@Override
protected synchronized boolean takeLeft(int[] forks) {
    return super.takeLeft(forks);
}

@Override
protected synchronized boolean takeRight(int[] forks) {
    return super.takeRight(forks);
}

为了实现拿不到叉子就等待的逻辑,可以考虑用while循环。下面的程序,如果拿不到叉子就让线程onSpinWait ,直到拿到叉子。

 while (true) {
     try {
         this.thinking();
         while (!this.takeLeft(forks)) {
             Thread.onSpinWait();
         }
         while (!this.takeRight(forks)) {
             Thread.onSpinWait();
         }

         this.eating();
         this.putLeft(forks);
         this.putRight(forks);
         this.finished();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 }

上面的程序执行起来会死锁。 如果你观察不到这个效果,可能是因为两个while 循环之间时间太短。如果你想观察明显,你可以在两个while 循环间增加100ms的sleep。

while (!this.takeLeft(forks)) {
    Thread.onSpinWait();
}
Thread.sleep(100);
while (!this.takeRight(forks)) {
    Thread.onSpinWait();
}

死锁的原因是所有线程同时拿起左叉子,然后所有右叉子都拿不到了。

活锁(livelock)

接下来我们增加一个机制,让拿不到右叉子的线程主动放弃左叉子。

int c = 0;
while (!this.takeRight(forks)) {
    c ++;
    if(c > 100) {
        this.putLeft(forks);
        continue;
    }
    Thread.onSpinWait();
}

执行起来死锁问题解决了。其实,还会有一种非常小的概率触发活锁(livelock)。这种情况发生在所有哲学家拿起左叉子,然后没有等待到右叉子,然后放下左叉子——如此循环,无穷无尽。

不过从概率上说,触发活锁的情况概率非常低。所以,在很多不完善的系统中,上面的程序也就可以作为一个解决方案出现了。只不过,一旦触发风险,也是致命的。所有线程都会变成僵尸。

下面是全部的程序:

public class DiningPhilosophersDeadlock {

    Phi[] phis = new Phi[5];
    volatile int[] forks = new int[5];

    public DiningPhilosophersDeadlock(){
        for(int i = 0; i < 5; i++) {
            phis[i] = new Phi(i+1);
            forks[i] = 0;
        }
    }

    class Phi extends Philosopher {

        public Phi(int id) {
            super(id);
        }

        @Override
        protected synchronized boolean takeLeft(int[] forks) {
            return super.takeLeft(forks);
        }

        @Override
        protected synchronized boolean takeRight(int[] forks) {
            return super.takeRight(forks);
        }

        @Override
        protected synchronized void putLeft(int[] forks) {
            super.putLeft(forks);
        }

        @Override
        protected synchronized void putRight(int[] forks) {
            super.putRight(forks);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    this.thinking();
                    while (!this.takeLeft(forks)) {
                        Thread.onSpinWait();
                    }

                    Thread.sleep(100);
                    int c = 0;
                    while (!this.takeRight(forks)) {
                        c ++;
                        if(c > 100) {
                            this.putLeft(forks);
                            continue;
                        }
                        Thread.onSpinWait();
                    }

                    this.eating();
                    this.putLeft(forks);
                    this.putRight(forks);
                    this.finished();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }


    public void run(){
        var pool = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5; i++) {
            pool.submit(phis[i]);
        }
    }

    public static void main(String[] argv) {
        var solver = new DiningPhilosophersDeadlock();
        solver.run();

    }
}

协议改进

一种简单的改进就是锁。下面程序我对整个处理过程简单粗暴上锁。这种方案可以解决问题。值得注意的时候,因为每次只有一个哲学家能指向下面全部的逻辑,因此这个哲学家肯定能够拿到叉子。因为同时只有一个哲学家在执行程序。

 while(true) {
     try {
         lock.lockInterruptibly();
         this.thinking();
         this.takeLeft(forks);
         this.takeRight(forks);
         this.eating();
         this.putLeft(forks);
         this.putRight(forks);
         this.finished();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }finally {
         lock.unlock();
     }
 }         

上面的程序速度过于慢了,我们需要减少锁的范围。一个比较好的方案,就是只锁拿叉子的环节:

while(true) {
    try {
        this.thinking();
        lock.lockInterruptibly();
        if(!(this.checkLeft(forks) && this.checkRight(forks))) {
            lock.unlock();
            continue;
        }
        this.takeLeft(forks);
        this.takeRight(forks);
        lock.unlock();

        this.eating();
        this.putLeft(forks);
        this.putRight(forks);
        this.finished();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

上面的程序只锁了拿叉子的环节,因此takeLeft可能会失败,这样我们可以考虑在获得锁的期间对左右叉子进行检查,也就是checkLeftcheckRight 。上面的改动将大大提高性能。

当前算法是不是 Obstruction-Free?

Eating方法如果出错(比如延迟很长的时间),那么资源(叉子)就不能被放下;其他线程就会被阻塞(无法进步)。所以线程间不是隔离的。所以这样的设计不是Obstruction Free更加不是Lock Free。

利用条件变量

上面程序会不断的循环,让拿不到叉子的线程在条件变量上阻塞是一种更好的做法。

在构造时增加:

Condition[] waitForks = new Condition[5];
// 构造函数
for(int i = 0; i < 5; i++) {
        ...
    +waitForks[i] = lock.newCondition();
}

然后主程序修改为:

  while(true) {
      try {
          this.thinking();
          lock.lockInterruptibly();
          while(!this.takeLeft(forks)) {
              waitForks[this.left()].await();
          }
          while(!this.takeRight(forks)) {
              waitForks[this.right()].await();
          }
          lock.unlock();

          this.eating();
          lock.lockInterruptibly();
          this.putLeft(forks);
          waitForks[this.left()].signalAll();
          this.putRight(forks);
          waitForks[this.right()].signalAll();
          lock.unlock();
          this.finished();
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }

通信:进一步性能优化

进一步性能优化可以允许哲学家想周围的哲学家索要叉子(允许进程间通信)。因为有的哲学家缺右叉子会等在那里,新产生饥饿的哲学家可以索要这个哲学家的叉子。

            while(true) {
                try {
                    this.thinking();
                    lock.lockInterruptibly();

                    var takeLeft = this.checkLeft(forks);
                    if(!takeLeft) {
                        lock.unlock();
                        Thread.onSpinWait();
                        continue;
                    }
                    var takeRight = this.checkRight(forks);

                    if (takeRight) {
                        this.takeLeft(forks);
                        this.takeRight(forks);
                    } else{
                        this.takeLeft(forks);
                        var rid = this.right();
                        var rPhi = phis[forks[rid] - 1];
                        if (dirty[rid] && rPhi.getState() != "Eating") {
                            forks[rid] = this.id;
                            dirty[rid] = false;
                        } else {
                            lock.unlock();
                            continue;
                        }
                    }
                    lock.unlock();
                    this.eating();

                    lock.lockInterruptibly();
                    this.putLeft(forks);
                    this.putRight(forks);
                    dirty[this.left()] = true;
                    dirty[this.right()] = true;

                    lock.unlock();
                    this.finished();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }
        }

具体点说,就是哲学家先拿起左边的叉子,然后尝试拿右边的叉子,会遇到两种情况:

  1. 如果可以拿到,哲学家拿起右叉子进餐
  2. 如果拿不到,就向右边的哲学家索要,如果右边的哲学家不在进餐,就可以考虑把叉子传递过去

为了防止两个哲学家之间反复传递叉子,需要一个数组记录叉子的状态。这里我用了dirtydirty=true 代表可以传递。一开始所有叉子都是dirty=true ,当一个哲学家转让了叉子,就把dirty设置为false

下面是完整的程序:

public class DiningPhilosophersWithTransfer implements Runnable {

    ReentrantLock lock = new ReentrantLock();
    int forks[] = new int[5];
    Condition[] waitForks = new Condition[5];
    Phi[] phis = new Phi[5];
    boolean[] dirty = new boolean[5];
    public DiningPhilosophersWithTransfer(){
        for(int i = 0; i < 5; i++) {
            phis[i] = new Phi(i+1);
            dirty[i] = true;
            waitForks[i] = lock.newCondition();
        }
    }

    class Phi extends Philosopher {

        public Phi(int id) {
            super(id);
        }

        @Override
        public void run() {
            while(true) {
                try {
                    this.thinking();
                    lock.lockInterruptibly();
                    while(!this.takeLeft(forks)) {
                        waitForks[this.left()].await();
                    }
                    while(!this.takeRight(forks)) {
                        var rid = this.right();
                        var rightPhi = phis[forks[rid] -1];

                        if(rightPhi.getState() != "Eating" && dirty[rid] == true) {
                            forks[rid] = this.id;
                            dirty[rid] = false;
                            break;
                        }
                        waitForks[this.right()].await();
                    }
                    lock.unlock();

                    this.eating();
                    lock.lockInterruptibly();
                    this.putLeft(forks);
                    waitForks[this.left()].signalAll();
                    this.putRight(forks);
                    waitForks[this.right()].signalAll();
                    dirty[this.left()] = true;
                    dirty[this.right()] = true;
                    lock.unlock();
                    this.finished();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void run() {
        //var pool = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5; i++) {
            new Thread(phis[i]).start();
        }
    }

    public static void main(String[] argv) {
        var solver = new DiningPhilosophersWithTransfer();
        solver.run();

    }
}

另一种解法:阻塞队列

上面介绍的几种方法,需要思考什么地方上锁,什么地方解锁。需要小心翼翼的维护状态,对写程序负担太大。因此,我们可以考虑用一种更简单、高效,且性能更好的方法,就是利用阻塞队列。

这里我构造一个控制线程,负责设置叉子的状态。然后再构造5个工作线程,用于承担哲学家thinkingeating 的开销。

这样架构的一个主要原因是设置状态的线程本身开销很轻;工作线程开销较大。

控制线程我们称为竞争管理器:ContentionManager,代码如下:

class ContentionManager implements Runnable {

    @Override
    public void run() {
        while(true) {
            try {
                var phi = managerQueue.take();
                if(phi.checkLeft(forks) && phi.checkRight(forks)) {
                    phi.takeLeft(forks);
                    phi.takeRight(forks);
                    workingQueue.offer(phi);
                } else {
                    managerQueue.offer(phi);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

控制线程每次从managerQueue 中取出一个哲学家,然后判断这个哲学家可以不可以拿起左右的叉子,如果可以就将哲学家放入工作线程。如果不行,就让哲学家在控制线程中重新排队。

工作线程Worker承载哲学家的工作,代码如下:

  class Worker implements Runnable {
        @Override
        public void run() {
            while(true) {
                Philosopher phi = null;
                try {
                    phi = workingQueue.take();
                    if(phi.getState() == "Hungry") {
                        phi.eating();
                        phi.putLeft(forks);
                        phi.putRight(forks);
                        phi.finished();
                        workingQueue.offer(phi);
                    } else {
                        phi.thinking();
                        managerQueue.offer(phi);
                    }
                } catch (InterruptedException e) {

                                        // 见下文讨论
                }
            }
        }
    }

工作线程遇到哲学家思考的状态,就让哲学家思考;思考结束后,将哲学家放入管理队列交给竞争管理器处理(ContentionManager)。

构造异常处理

还有一个比较有趣的问题是如果进餐有10%的失败概率,失败后哲学家会陷入一个非常长时间的休眠。这个时候,我们可以考虑实现一个延迟中断队列,最终的代码如下:

package concurrent;

import java.util.Arrays;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;

public class DiningPhilosophersBlockingQueue implements Runnable {

    Philosopher[] phis;
    volatile int forks[];
    LinkedBlockingQueue<Philosopher> workingQueue;
    LinkedBlockingQueue<Philosopher> managerQueue;
    DelayQueue<DelayInterruptingThread> delayQueue = new DelayQueue<>();

    class DelayInterruptingThread implements Delayed{

        long time;
        Thread current;

        public DelayInterruptingThread(Thread t, long delay) {
            this.current = t;
            this.time = System.currentTimeMillis() + delay;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }

        @Override
        public int compareTo(Delayed o) {
            return (int) (time - ((DelayInterruptingThread)o).time);
        }

        public void rollback() {
            if(this.current != null) {
                this.current.interrupt();
            }
        }

        public void commit() {
            this.current = null;
        }
    }



    class Worker implements Runnable {
        @Override
        public void run() {
            while(true) {
                Philosopher phi = null;
                try {
                    phi = workingQueue.take();
                    if(phi.getState() == "Hungry") {
                        var delayItem = new DelayInterruptingThread(Thread.currentThread(), 1000);
                        delayQueue.offer(delayItem);
                        phi.eating();
                        delayItem.commit();
                        phi.putLeft(forks);
                        phi.putRight(forks);
                        phi.finished();
                        workingQueue.offer(phi);
                    } else {
                        phi.thinking();
                        managerQueue.offer(phi);
                    }
                } catch (InterruptedException e) {

                    if(phi != null) {
                        // Rollback Phi. State
                        phi.putLeft(forks);
                        phi.putRight(forks);
                        if(phi.getState() == "Eating") {
                            phi.setState("Hungry");
                        }
                        managerQueue.offer(phi);
                    }
                }
            }
        }
    }

    class InterruptingWorker implements Runnable {

        @Override
        public void run() {
            while(true) {
                try {
                    DelayInterruptingThread delayed = (DelayInterruptingThread) delayQueue.take();
                    delayed.rollback();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }

    class ContentionManager implements Runnable {

        @Override
        public void run() {
            while(true) {
                try {
                    var phi = managerQueue.take();
                    if(phi.checkLeft(forks) && phi.checkRight(forks)) {
                        phi.takeLeft(forks);
                        phi.takeRight(forks);
                        workingQueue.offer(phi);
                    } else {
                        managerQueue.offer(phi);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }


    public DiningPhilosophersBlockingQueue() {
        phis = new Philosopher[5];
        forks = new int[5];
        workingQueue = new LinkedBlockingQueue<>();
        managerQueue = new LinkedBlockingQueue<>();
        for(int i = 0; i < 5; i++) {
            phis[i] = new Philosopher(i+1);
            workingQueue.offer(phis[i]);
        }
    }



    public void run(){
        var pool = Executors.newFixedThreadPool(7);
        for(int i = 0; i < 5; i++) {
            pool.submit(new Worker());
        }
        pool.submit(new ContentionManager());
        pool.submit(new InterruptingWorker());
    }


    public static void main(String[] argv) {

        var solver = new DiningPhilosophersBlockingQueue();
        solver.run();
    }
}

如果感兴趣性能比较,可以看我的视频。里面有完整的line-by-line的讲解和演示。

总结

这节课我们学习了哲学家就餐问题的写法。

从最简单的上锁,到减少锁的范围,到利用条件变量节省循环等待时间,到利用哲学家之间通信提升对资源的利用效率,到利用阻塞队列减少程序负担——这些都是常规手段。

最后,我想说的是:高手,可能会跳出这些常规手段,利用算法,有可能构造出更快的方案,直接解决哲学家就餐问题。不过对于我们这个阶段,哪怕是想拿到架构师的Title,我们还是先从掌握所有常规手段入手。将来到了架构师岗位上,还能继续深造。

下一节课,我们将讨论一些并发领域的补充内容。

文章来源于自己总结和网络转载,内容如有任何问题,请大佬斧正!联系我