线程通信的几种方式 线程通信的三种方式



文章插图
线程通信的几种方式 线程通信的三种方式

文章插图
简介:
线程开始运行,拥有自己的栈空间,就会如同一个脚本一样,按照既定的代码一步步的执行,直到终止 。但是,如果每个线程之间都是孤立的,那么它们的价值就会很少;反之,如果多个线程能够配合着完成工作,将会带来各方面巨大的收益 。
1、volatile和synchronized关键字
说明:(不做过多说明,需要的话可以看我的往期)
Java支持多线程访问一个对象或者对象的成员变量,由于每个线程都拥有这个变量的拷贝(为了执行速度更快),所以程序执行过程中读取的数据往往不是最新的 。
关键字volatile可以用来修饰字段(成员变量),作用通俗来讲就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新到共享内存中,volatile能保证线程对变量的可见性 。
关键字synchronized可以修饰方法或者同步代码块的形式来进行使用,它主要能确保多个线程在同一时刻,只有一个线程处于方法或者同步代码块中,它保证了线程对变量访问的可见性和排他性 。
通过使用javap工具查看生成class文件信息来分析下synchronized关键字的实现细节,如下代码是使用了同步块和同步方法 。
代码示例:
package com.lizba.p3;/** * <p> *同步方法和同步代码块示例代码 * </p> * * @Author: Liziba * @Date: 2021/6/15 22:13 */public class Synchronized {public static void main(String[] args) {// 同步代码块synchronized (Synchronized.class) {}// 静态方法method();}public static synchronized void method() {}}在Synchronized.class同级目录执行javap -v Synchronized.class
javap -v Synchronized.class重点关注部分输出:
同步代码块使用monitorenter和monitorexit指令
同步方法使用了ACC_SYNCHRONIZED
总结:
同步代码块和同步方法使用了不同的方式来加锁,其本质上都是对一个对象的监视器(monitor)获取,而这个获取的过程是排他的,也就是说同一时刻只会有一个线程获取由synchronized所保护的监视器 。我们知道,任意一个对象都拥有自己的监视器锁,当这个对象由同步代码块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该线程对象的监视器锁才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步代码块和同步方法的入口处,进入BLOCKED状态 。
图示对象、对象的监视器、同步队列和执行线程之间的关系
总结上图:
任意线程对Object(受Synchronized保护)的访问,首先要获取Object的监视器 。如果获取失败则进入同步队列,线程变为BLOCKED 。当访问Object的前驱(获得了锁的线程)释放了锁,则该释放操作唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取 。
2、等待通知机制
一个线程修改了一个对象的值,另一个对象感知到其的变化,然后进行相应的操作,这种类似于生产者-消费者模式的功能,在Java线程之间是怎么实现的呢?
最简单的做法:
// 使用while循环检测变量的值while (flag) {// 防止一直执行,未满足条件进行短暂睡眠Thread.sleep(1000);}doSomething();上述代码存在问题:
不能确保及时性,通过睡眠的方式来释放处理器资源,会导致时效性问题难以降低开销,通过降低睡眠的时间来提升时效性又会带来过高的处理器资源开销
Java内置解决办法:
以上两个看似矛盾的问题,却可以通过Java内置的等待/通知机制很好的得以解决,等待/通知机制是Java任意对象都具备的,因为这些方法被定义在对象的超类Object中 。
public final native void notify();public final native void notifyAll();public final void wait() throws InterruptedExceptionpublic final native void wait(long timeout) throws InterruptedException;public final void wait(long timeout, int nanos) throws InterruptedException
方法名称描述notify()通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁notifyAll()通知所有等待在该对象上的线程wait()调用方的线程进入WAITING状态,只有等待其他线程的通知或被中断才会返回,需要注意,调用wait()方法会释放锁wait(long)超时等待一段时间,如果时间到没有通知就超时返回 。单位mswait(long, int)对于超时时间做更加细粒度的控制可以精确到纳秒
等待/通知机制描述:
等待/通知机制是指一个线程A调用了对象O的wait()方法进入等待状态,另一个线程B调用了对象O的notify()或notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作 。对象O上的wait()和notify()/notifyAll()就好比一个开关信号,用来完成等待方和通知方的交互工作(就好比一开始说的生产者-消费者模型)
示例代码:
package com.lizba.p3;import com.lizba.p2.SleepUtil;import java.text.SimpleDateFormat;import java.util.Date;/** * <p> *wait()和notify()/notifyAll()示例代码 * </p> * * @Author: Liziba * @Date: 2021/6/15 23:28 */public class WaitNotify {static boolean flag = true;static Object lock = new Object();static final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");public static void main(String[] args) {Thread waitThread = new Thread(new Wait(), "waitThread");waitThread.start();SleepUtil.sleepSecond(1);Thread notifyThread = new Thread(new Notify(), "notifyThread");notifyThread.start();}/*** wait线程,当条件不满足时wait()*/static class Wait implements Runnable{@Overridepublic void run() {// 加锁synchronized(lock) {// 当条件不满足时,继续waitwhile (flag) {System.out.println(Thread.currentThread()+ " flag is true. wait at " +sdf.format(new Date()));try {// 此操作会释放锁lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 满足条件是完成工作System.out.println(Thread.currentThread()+ " flag is false. finished at " + sdf.format(new Date()));}}}static class Notify implements Runnable {@Overridepublic void run() {// 加锁synchronized (lock) {// 获取到锁或通知等待在锁上的线程// 通知不会释放锁,直到当前线程执行完释放lock锁后,waitThread才能从wait方法返回System.out.println(Thread.currentThread()+ "hold lock. notify at " + sdf.format(new Date()));lock.notifyAll();flag = false;SleepUtil.sleepSecond(5);}// 再次加锁synchronized (lock) {System.out.println(Thread.currentThread()+ "hold lock again. notify at " + sdf.format(new Date()));SleepUtil.sleepSecond(5);}}}}查看输出:
注意上述的hold lock again 和 flag is flase这两行代码可能执行顺序会互换 。
总结:
【线程通信的几种方式 线程通信的三种方式】使用wait()、notify()和notifyAll()需要先对该对象加锁调用wait()方法后线程由RUNNING状态变为WAITING状态,并且将当前线程放置到对象的等待队列中notify()方法和notifyAll()调用后,等待的线程需要等到调用notify()和notifyAll()的线程释放锁后,等待队列中的线程才有机会从wait()返回notify()移动一个线程从等待队列到同步队列,notifyAll()移动所有等待线程,过程是将线程从等待队列移动到同步队列中,被移动的线程由WAITING变为BLOCKED状态从wait()方法返回的前提是获取了对象的锁wait()、notify()和notifyAll()机制依赖的是同步机制,其目的是为了从wait()方法返回的线程能感知到其他线程对变量作出的修改
图示上述过程:
总结上图:
WaitThread线程首先获取了锁,然后调用对象的wait()方法,从而释放了锁进入对象的等待队列WaitQueue中,进入等待状态 。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并且调用了对象的notify()方法,将处于等待队列WaitQueue的WaitThread移动到了SynchronizedQueue中,此时WaitThread的状态变为阻塞状态 。NotifyThread释放了锁之后,WaitThread再次获取到锁从wait()方法返回继续执行 。
3、等待/通知的经典范式
等待/通知的经典范式,分为等待方和通知方,这两者需要分别遵循如下规则 。
等待方遵循如下规则:
获取对象的锁如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件条件满足则执行对应的逻辑
// 示例等待方伪代码synchronized(对象) {while(条件不满足) {对象.wait();}// ToDo...}通知各方遵循如下规则:
获取对象的锁改变条件通知所有等待在对象身上的线程
// 示例通知方伪代码synchronized(对象) {改变条件对象.notifyAll();}4、管道输入/输出流
管道输入/输出流和普通文件输入/输出流或者网络输入/输出流的不同之处在于,管道输出/输出流主要用于线程之间的数据传输,传输的媒介为内存 。
管道输入/输出流的具体实现:
PipedInputStreamPipedOutputStreamPipedReaderPipedWriter
1、2为字节流,3、4为字符流 。
示例代码:
package com.lizba.p3;import java.io.IOException;import java.io.PipedReader;import java.io.PipedWriter;/** * <p> *管道流 * </p> * * @Author: Liziba * @Date: 2021/6/16 21:07 */public class Piped {public static void main(String[] args) throws IOException {PipedWriter out = new PipedWriter();PipedReader in = new PipedReader();// 输入输出流连接(不连接会报错)out.connect(in);Thread printThread = new Thread(new Print(in), "PrintThread");printThread.start();// 输入int receive = 0;try {while ((receive = System.in.read()) != -1) {out.write(receive);}} finally {out.close();}}/*** 单个字符读取并输出**/static class Print implements Runnable {private PipedReader in;public Print(PipedReader in) {this.in = in;}@Overridepublic void run() {int receive = 0;try {while (true) {// 单个字符读取if ((receive = in.read()) != -1){System.out.print((char)receive);}}} catch (IOException e) {e.printStackTrace();}}}}测试代码样例:
## 输入hello liziba## 输出hello liziba5、Thread.join()
Thread.join()的语义含义:当前线程A等待Thread线程终止之后才从Thread.join()处返回 。线程提供的join()方法的api如下:
public final void join() throws InterruptedException// 下面两个具有超时等待,线程再给定的时间没有返回,那么超时的方法会返回public final synchronized void join(long millis, int nanos)public final synchronized void join(long millis)示例代码:
设置十个线程,分别从0-9,每个线程需要调用前一个线程的join()方法,比如线程0结束了,线程1才能从join()返回线,程1结束了,线程2才能从join()返回 。
package com.lizba.p3;import com.lizba.p2.SleepUtil;import java.util.concurrent.TimeUnit;/** * <p> *join()等待通知机制 * </p> * * @Author: Liziba * @Date: 2021/6/16 21:25 */public class Join {public static void main(String[] args) {// 前一个线程Thread previous = Thread.currentThread();for (int i = 0; i < 10; i++) {Thread t = new Thread(new Domino(previous), String.valueOf(i));t.start();previous = t;}SleepUtil.sleepSecond(5);System.out.println(Thread.currentThread().getName() + " end.");}static class Domino implements Runnable {private Thread thread;public Domino(Thread thread) {this.thread = thread;}@Overridepublic void run() {try {thread.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " end.");}}}查看输出结果:
总结上述代码:
每个线程终止的前提是前驱线程的终止,每个线程等待前驱线程终止后,才从join()返回,这里涉及了等待/通知机制,具体原理我们可以通过看JDK的源码来了解:
public final synchronized void join(long millis)throws InterruptedException {long base = System.currentTimeMillis();long now = 0;if (millis < 0) {throw new IllegalArgumentException("timeout value is negative");}// 超时等待时间未设置则为0,也就是join()方法if (millis == 0) {// 判断当前线程是否终止while (isAlive()) {// 如果未终止,继续wait()wait(0);}} else {// 判断当前线程是否终止while (isAlive()) {long delay = millis - now;// 判断超时等待时间是否已经到了,如果到了则返回if (delay <= 0) {break;}// 否则继续等待,计算新的时间传入wait(delay);now = System.currentTimeMillis() - base;}}}// 尝试判断当前线程时候已经执行完毕(是否还活着)public final native boolean isAlive();6、ThreadLocal的使用
本文不会详细讲述ThreadLocal的核心原理,之后简单的介绍ThreadLocal的使用,后续会单独分一篇文章来详述其原理和使用 。
ThreadLocal即线程变量,它是以ThreadLocal对象为键、任意对象为值的存储结构 。这个存储结构可以附带在线程上,我们可以通过一个ThreadLocal对象来查询绑定在这个线程上的一个值 。
示例代码:
如下代码构造一个计算方法调用时间计算的类 。
package com.lizba.p3;import com.lizba.p2.SleepUtil;/** * <p> * * </p> * * @Author: Liziba * @Date: 2021/6/16 22:04 */public class Profiler {private static final ThreadLocal<Long> TIME_THREAD_LOCAL = new ThreadLocal<Long>() {@Overrideprotected Long initialValue() {return System.currentTimeMillis();}};public static final void begin() {TIME_THREAD_LOCAL.set(System.currentTimeMillis());}public static final Long end() {return System.currentTimeMillis() - TIME_THREAD_LOCAL.get();}public static void main(String[] args) {Profiler.begin();SleepUtil.sleepSecond(1);System.out.println("Cost: " + Profiler.end());}}查看执行结果: