Java高级 – 多线程必知必会

Java高级 – 多线程必知必会

什么是线程

在电脑打开任务管理器,我们能看到一个个的进程,这些就是计算机运行中的程序。而线程,则是进程中的一个实体,换句话说,线程是依附进程而存在的。从操作系统的角度来说,线程就是CPU分配的基本单位。从JVM的角度来说,它可以被看作是执行Java代码的最小单位。我们在执行main()方法的时候其实就是启动了一个JVM的进程,而main()方法所在的线程就是这个进程中的主线程

由于线程是进程的一个实体,因此一个进程可以包含多个线程,多个线程之间共享进程的堆和方法区,但是每个线程都有自己的程序计数器和栈

创建一个线程

Java中有三种方法创建线程,分别是:

  • 实现Runnable接口
  • 继承Thread类
  • 使用FutureTask
package foundation;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/16
 * @desc : 创建线程的三种方法 - Runnable / Thread / FutureTask
 */
public class CreateAThread {

    public static class RunnableTest implements Runnable {

        @Override
        public void run() {
            System.out.println("Create by Runnable");
        }
    }

    public static class ThreadTest extends Thread {
        @Override
        public void run() {
            super.run();
            System.out.println("Create by extend Thread");
        }
    }

    public static class CallbackTest implements Callable<String> {

        @Override
        public String call() {
            return "Create by Callback";
        }
    }

    public static void main(String[] args) {

        // Runnable
        RunnableTest runnableTest = new RunnableTest();
        new Thread(runnableTest).start();

        // Thread
        ThreadTest test = new ThreadTest();
        test.start();

        // FutureTask
        FutureTask<String> futureTask = new FutureTask<>(new CallbackTest());
        new Thread(futureTask).start();
        try {
            String result = futureTask.get();
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

}

运行结果:

Create by Runnable
Create by extend Thread
Create by Callback

这三种方式的区别:

  • Runnable是接口,Thread是类,因为Java单继承的关系,接口一般比类耦合度更低
  • Runnable和Thread没有返回值,FutureTask可以通过futureTask.get()获取返回值

线程的状态和切换

我们首先说一下线程的几个状态:

  • NEW:一个刚创建又未启动的线程处于这个状态。由于一个线程只能被启动一次,因此NEW状态在这个线程中只会出现一次。
  • RUNNABLE:这个状态包括两个子状态:READY和RUNNING。
  • BLOCKED:处于该状态表示线程是阻塞状态,一般是发起了一个阻塞操作或者尝试获取其他线程持有的锁的时候
  • WAITING:调用了wait, join, LockSupport.park方法可以让线程处于该状态,这是无线等待被唤醒的状态。调用了notify\ notifyAll和LockSupport.unpark方法可以让线程从WAITING切换到RUNNABLE
  • TIMED_WAITING: 和WAITING类似,区别在于这是有时间显示的等待状态。到达指定时间限制后会切换到RUNNABLE
  • TERMINATED:执行结束的线程状态。无论是run方法正常返回,还是抛出异常结束,都会让线程处于这个状态。

我们可以看到,除了NEW和TERMINATED这两个状态只能出现一次,其他状态之间都是可以相互切换的。因此也就需要说到我们经常听到的:上下文信息。这个词是什么意思呢?例如现在线程A正在执行,然后突然线程B过来了,告诉A:你先停一下,我这个任务更重要。于是线程A从RUNNABLE中的RUNNING状态,切换为WAITING状态,这个时候我们需要把线程的上下文信息保存下来,方便待会A继续执行。

上下文信息:包括CPU的寄存器和程序计数器在某一时间的内容。

由于切换上下文需要包括和恢复所需数据,所以这个操作是会给CPU带来额外的开销。这个在多线程编程的时候也是我们需要注意的地方。

Object.wait()

当一个线程调用一个共享变量的wait()方法时,该调用线程会被阻塞挂起,直到被重新唤醒或者抛出异常被终止。

举个例子:

package foundation;

import java.util.LinkedList;
import java.util.Queue;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/16
 * @desc : 
 */
public class WaitDemo {
    static final Queue<Integer> queue = new LinkedList<>();
    private static final int MAX_SIZE = 5;

    public static void main(String[] args) {
        synchronized(queue) {
            while (queue.size() == MAX_SIZE) {
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            queue.add(1);
            System.out.println("--- add: " + queue.size());
            queue.notifyAll();
        }

        synchronized (queue) {
            while (queue.isEmpty()) {
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            queue.remove(1);
            System.out.println("--- remove: " + queue.size());
            queue.notifyAll();
        }
    }

}

输出结果:

--- add: 1
--- remove: 0

我们模拟生产者+消费者的模式,通过wait()让线程挂起阻塞,然后再通过notifyAll() 唤醒等待的线程。

当然,上面的代码并不能真正达到不停生产+消费的效果,因为我们的线程在执行一次之后就结束了。

我们通过另外一个wait() / notify() 的例子来看看真正的生产者+消费者模式是怎么样的:

import java.util.concurrent.ConcurrentHashMap;

public class ComsumerAndProvider {
    private static final Object lock = new Object();
    private static final int MAX_SIZE = 10;
    private static int count;

    static class Comsumer implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (lock) {
                    try {

                        while (count <= 0) {
                            System.out.println("--- empty");
                            lock.wait();
                        }
                        count--;
                        System.out.println(Thread.currentThread().getName() + " comsumer: " + count);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lock.notifyAll();
                }
            }
        }
    }

    static class Provider implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (lock) {
                    try {

                        while (count >= MAX_SIZE) {
                            System.out.println("--- full");
                            lock.wait();
                        }
                        count++;
                        System.out.println(Thread.currentThread().getName() + " provider: " + count);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lock.notifyAll();
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new Provider()).start();
        new Thread(new Provider()).start();
        new Thread(new Provider()).start();

        new Thread(new Comsumer()).start();
        new Thread(new Comsumer()).start();
        new Thread(new Comsumer()).start();
    }

}

在上面这段代码里面,我们在生产者和消费者中都加入了while(true) 这个死循环,保证生产者和消费者都是不断执行的,这样打印出来的结果就是:

Thread-1 provider: 1
Thread-0 provider: 2
Thread-2 provider: 3
Thread-0 provider: 4
Thread-1 provider: 5
Thread-2 provider: 6
Thread-1 provider: 7
Thread-0 provider: 8
Thread-2 provider: 9
Thread-5 comsumer: 8
Thread-3 comsumer: 7
Thread-4 comsumer: 6
Thread-0 provider: 7
Thread-1 provider: 8
Thread-2 provider: 9
Thread-1 provider: 10
--- full
--- full
Thread-4 comsumer: 9
Thread-2 provider: 10

我们同时启动了3个生产者和3个消费者的线程,但是生产者的速度比消费者快,因此后面会出现满状态,这个时候生产者就会调用wait()方法挂起,等待消费者消费之后被唤醒,再继续生产。

Thread.interrupt()

这个方法用于中断线程。举个例子:

package foundation;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/18
 * @desc :
 */
public class WaitNotifyInterupt {
    static Object obj = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("----begin-------");
                synchronized (obj) {
                    try {
                        obj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println("----end------");
            }
        });

        thread.start();

        Thread.sleep(1000);

        System.out.println("-------- begin interrupt threadA------");
        thread.interrupt();
        System.out.println("--------- end interrupt threadA--------");
    }
}

输出结果:

----begin-------
-------- begin interrupt threadA------
--------- end interrupt threadA--------
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at foundation.WaitNotifyInterupt$1.run(WaitNotifyInterupt.java:19)
    at java.lang.Thread.run(Thread.java:748)
----end------

调用了start()sleep(1000) 这两个方法之后,我们通过interrupt() 方法让线程(threadA)中断,于是会抛出InterruptedException异常。

Object.notify()

调用了共享对象的notify()方法之后,会唤醒一个在该共享变量上调用wait方法之后被挂起的线程。一个共享变量可能会有多个线程在等待,具体唤醒哪个等待线程是随机的,由JVM来决定。

类似wait方法,只有当前线程获取到了共享变量的监视器锁之后,才能调用共享变量的notify方法。否则会抛出IllegalMonitorStateException异常。

Object.notifyAll()

和notify方法类似,ntofiyAll()方法会唤醒当前共享变量所有在挂起等待的线程。

举个例子:

package foundation;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/18
 * @desc :
 */
public class NotifyAllDemo {

    private static volatile Object resourceA = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (resourceA) {
                    System.out.println("threadA get resourceA lock");

                    try {
                        System.out.println("threadA begin wait");
                        resourceA.wait();
                        System.out.println("threadA end wait");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (resourceA) {
                    System.out.println("threadB get resourceA lock");
                    try {
                        System.out.println("threadB begin wait");
                        resourceA.wait();
                        System.out.println("threadB end wait");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

            Thread threadC = new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (resourceA) {
                        System.out.println("threadC begin notify");
                        resourceA.notify();
                    }
                }
            });

        threadA.start();
        threadB.start();

        Thread.sleep(1000);
        threadC.start();

        threadA.join();
        threadB.join();
        threadC.join();

        System.out.println("over");
    }
}

输出结果:

threadA get resourceA lock
threadA begin wait
threadB get resourceA lock
threadB begin wait
threadC begin notify
threadA end wait

从输出结果我们可以看到,线程A首先获得了对象锁并且执行到wait()方法,然后挂起进入到阻塞队列中。接着线程B获取对象锁,也同样调用wait()方法挂起。这个时候阻塞队列里面就有A和B两个线程了。在线程C中调用了notify()方法。这会唤醒阻塞队列中的一个线程,因此线程A被唤醒了,最后输出了threadA end wait

如果我们把notify换成notifyAll,结果又是怎样呢?

输出结果:

threadA get resourceA lock
threadA begin wait
threadB get resourceA lock
threadB begin wait
threadC begin notify
threadB end wait
threadA end wait
over

可以看到,如果调用了notifyAll()而不是notify(),那么resourceA的所有等待线程,包括线程A和B都会被唤醒,最后的over也被打印出来了。

Thread.join()

在上面的例子中我们加入了Thread.join(),这个方法的作用就是等待线程执行完毕。这个方法在实际工作场景中经常会用到,例如我们现在需要调用接口A,然后在A返回的结果中有一个参数需要用来调用接口B,那么我们就需要先等待A返回结果,然后才能去调用B。或者是现在需要请求一个List,我们需要发送多个请求,等待所有结果都返回了再统一处理。类似这种需要等待结果的情况,我们就可以用join()方法来实现。

举个例子:

package foundation;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/18
 * @desc :
 */
public class JoinDemo {

    public static void main(String[] args) throws InterruptedException {
        Thread threadOne = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("child threadOne over!");
            }
        });

        Thread threadTwo = new Thread(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("child threadTwo over!");
        });

        threadOne.start();
        threadTwo.start();

        System.out.println("wait all child thread over");

        threadOne.join();
        threadTwo.join();

        System.out.println("all child thread over");
    }
}

输出结果:

wait all child thread over
child threadTwo over!
child threadOne over!
all child thread over

在上面的例子中,我们对ThreadOne睡眠1s,ThreadTwo睡眠3s,所以输出结果的顺序是:

打印 - wait all child thread over

1s后打印 - child threadOne over!

2s后打印 - child threadTwo over! 和 all child thread over

说明调用了join()方法后,主线程会等待所有线程结束后,再进行下一步的工作。

Thread.sleep()

如果我们需要暂停当前线程的工作,让其他线程先执行。这个方法就是sleep()

如果某个线程调用了sleep()方法,那么在这期间是不会参与CPU的调度的,但是同时,它持有的监视器资源是不会让出的。这点和wait() 不一样。

举个例子:

package foundation;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/18
 * @desc :
 */
public class SleepDemo {
    private static final Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        Thread threadA = new Thread(() -> {
           lock.lock();
           try {
               System.out.println("child threadA is in sleep");
               Thread.sleep(10000);
               System.out.println("child threadA is in awaked");
           } catch (InterruptedException e) {
               e.printStackTrace();
           } finally {
               lock.unlock();
           }
        });

        Thread threadB = new Thread(() -> {
           lock.lock();
           try {
               System.out.println("child threadB is in sleep");
               Thread.sleep(10000);
               System.out.println("child threadB is in awaked");
           } catch (InterruptedException e) {
               e.printStackTrace();
           } finally {
               lock.unlock();
           }
        });

        threadA.start();
        threadB.start();
    }
}

输出结果:

child threadA is in sleep
10s后:child threadA is in awaked
child threadB is in sleep
10s后:child threadB is in awaked

我们创建了线程A和B,在一开始都首先获取锁,然后让线程睡眠10s,最后再释放锁。

在A睡眠的期间,线程B是无法获得锁的,因为sleep()不会让出监视器资源,因此B只有在等待A执行了lock.unlock() 之后,才能获取锁并且执行代码。

Thread.yield()

上面说到的sleep() 方法是不会让出监视器资源的,那么如果我们需要线程让出资源呢?这个时候就要说到yield()方法了。

举个例子:

package foundation;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/18
 * @desc :
 */
public class YieldDemo implements Runnable{

    YieldDemo() {
        Thread t = new Thread(this);
        t.start();
    }

    @Override
    public void run() {
        for (int i=0; i<50; i++) {
            System.out.println(Thread.currentThread() + " running: " + i);
            if (i == 40) {
                System.out.println(Thread.currentThread() + " yield cpu..." + i);
                Thread.yield();
            }
        }

        System.out.println(Thread.currentThread() + " is over");
    }

    public static void main(String[] args) {
        new YieldDemo();
        new YieldDemo();
    }
}

输出结果:

Thread[Thread-0,5,main] running: 40
Thread[Thread-1,5,main] running: 36
Thread[Thread-0,5,main] yield cpu...40
Thread[Thread-1,5,main] running: 37
Thread[Thread-0,5,main] running: 41
Thread[Thread-1,5,main] running: 38
Thread[Thread-0,5,main] running: 42
...

我们从结果可以看到,线程0在i=40的时候调用了yield()方法,让出了CPU,于是线程1获得了CPU的执行权,打印出了37。

当然,yield() 不保证让出资源之后就不再进行争夺了,因此也有可能会打印出以下结果:

Thread[Thread-0,5,main] running: 40
Thread[Thread-1,5,main] running: 36
Thread[Thread-0,5,main] yield cpu...40
Thread[Thread-0,5,main] running: 41
Thread[Thread-0,5,main] running: 42
...

也就是说,我让出了资源,但是我又抢到了,那我就继续执行呗。

通过以上例子,我们再来总结一下yield()方法的作用:让当前线程让出CPU,哪怕当前时间片还没轮转完,也直接让出资源,让所有就绪状态的线程进行资源争夺,这个和线程的优先级也有关系,因此存在上一个线程让出资源后又继续执行的情况。

问题:sleep()和yield()的区别是什么?

sleep() 之后线程会进入挂起阻塞状态,但是yield() 之后线程还是就绪状态,仍然会去抢夺CPU资源。

总结

我们说了Thread类里面经常用到的关于线程状态切换的方法。下面总结一下线程的几个不同的状态:

状态 描述 切换
Ready 线程正在等待使用CPU,经调度程序调用之后可以进入running状态 Object.wait() / Thread.join() 从RUNNABLE到Waiting
Running 线程正在使用CPU Object.wait() / Thread.join() 从RUNNABLE到Waiting
Waiting 线程进入阻塞挂起状态,一般是调用wait或者等待I/O资源 Object.notify() / Object.notifyAll() 从Waiting 到 RUNNABLE
Terminated 线程结束
New 创建线程,此时线程还没启动 Thread.start() 从NEW 到 Runnable

上下文切换

在多线程环境中,当一个线程的状态由RUNNABLE转换为非RUNNABLE时,相应线程的上下文信息(包括CPU的寄存器和程序计数器在某一时间点的内容等)需要被保存,方便后面线程再次进入RUNNABLE时能够在之前执行进度的基础上继续前进。对上下文信息的保存和恢复,就是上下文切换。

举个例子:

现在有线程A和线程B,但是CPU只有一个核,在某一时间内只有一个线程能够被执行,那么在A线程执行完一个CPU时间片之后,需要让出资源,这个时候要把执行的进度给保存下来,进入线程B。然后再次轮到A执行的时候,我们就能恢复到之前执行的地方,继续执行下去。

死锁

说完了上下文切换,我们可以了解一下死锁。死锁是指多个线程在执行过程中,因为抢夺资源而造成的互相等待的现象。

死锁的产生必须具备以下四个条件:

  • 互斥:线程对已经获取到的资源进行排它性使用,也就是这个资源同一时间只能有一个线程占用。
  • 请求并保持:一个线程已经持有了一个资源,然后他又发起新的资源请求,但是这个资源被其他线程持有,这个时候它就进入了阻塞状态,同时它持有的资源也不会被释放、
  • 不可剥夺:在线程使用完资源之前,这个资源不能被其他线程抢占。
  • 环路等待:也就是说等待资源的线程形成了环路,每个线程需要的资源正好都被其他线程所占用。

举个例子:

package foundation;

/**
 * @author : BlueLzy
 * @email : bluehobert@gmail.com
 * @date : 2020/07/18
 * @desc :
 */
public class DeadLockDemo {
    private static Object resourceA = new Object();
    private static Object resourceB = new Object();

    public static void main(String[] args) {
        Thread threadA = new Thread(() -> {
           synchronized (resourceA) {
               System.out.println(Thread.currentThread() + " get ResourceA");

               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

               System.out.println(Thread.currentThread() + " waiting get resourceB");
               synchronized (resourceB) {
                   System.out.println(Thread.currentThread() + "get resourceB");
               }
           }
        });

        Thread threadB = new Thread(() -> {
           synchronized (resourceB) {
               System.out.println(Thread.currentThread() + " get resourceB");

               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }

               System.out.println(Thread.currentThread() + " waiting get resourceA");
               synchronized (resourceA) {
                   System.out.println(Thread.currentThread() + " get resourceA");
               }
           }
        });

        threadA.start();
        threadB.start();
    }
}

输出结果:

Thread[Thread-0,5,main] get ResourceA
Thread[Thread-1,5,main] get resourceB
Thread[Thread-0,5,main] waiting get resourceB
Thread[Thread-1,5,main] waiting get resourceA

我们让A和B分别持有资源A和资源B,然后线程睡眠1s之后去请求对方的资源,这个时候不可避免就进入了死锁状态,两个线程的资源不会被释放,一直在等待。

那么如何避免死锁呢?

  • 破坏请求并持有条件
  • 破坏环路等待条件

在上面的例子中,如果我们让两个线程的请求资源顺序相同,那么就可以避免死锁状态了,当A和B一开始都需要请求资源A,这个时候只有一个线程能获得锁,另外一个线程就必须等待,从而破坏了请求并持有的条件。

参考资料

 
 
 

本文以创作共用版权协议发布,转载本文要求遵循“署名-非商业性使用-相同方式共享3.0”的创作共用协议,并以链接形式指明本文地址。

本文链接地址:https://www.bluelzy.top/java-advance-concurrent-one/

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注