java基础 - 线程

it2022-05-05  128

5.线程

进程:是正在运行的程序。

是系统进行资源分配和调用的独立单位。每一个进程都有它自己的内存空间和系统资源。

线程:是进程中的单个顺序控制流,是一条执行路径。

单线程:一个进程如果只有一条执行路径,则称为单线程程序。多线程:一个进程如果有多条执行路径,则称为多线程程序。

5.1 线程的创建方式

实现Runnable接口实现Callable接口继承Thread类

5.1.1 实现Runnable接口

步骤:

​ (1)定义Runnable接口的实现类,并重写改接口的run()方法。

​ (2)创建Runnable实现类的实例,并以此实例作为Thread的target对象,即该Thread对象才是真正的线程对象。

实现代码:

package 多线程; import java.util.concurrent.TimeUnit; public class LiftOff implements Runnable{ protected int countDown = 10; private static int taskCount = 0; private final int id = taskCount++; public LiftOff(){ } public LiftOff(int countDown){ this.countDown = countDown; } public String status(){ return "#" + id +"(" + (countDown > 0 ? countDown : "LiftOff!") +"),"; } @Override public void run() { while(countDown-- > 0){ System.out.println(status()); Thread.yield();//当前线程让出cpu资源 // try { // Thread.sleep(1000); // TimeUnit.MILLISECONDS.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } } } public static void main(String[] args) { for(int i = 0;i<2;i++){ Thread t = new Thread(new LiftOff()); t.start(); } System.out.println("Waiting for LiftOff"); } }

1. 使用Executor(执行器)管理Thread对象

https://www.jianshu.com/p/bbcf4921797c

https://www.cnblogs.com/fengsehng/p/6048610.html

使用new Thread()的缺点

(1)每次new Thread()耗费性能

(2)调用new Thread()创建的线程缺乏管理,被称为野线程,而且可以无限制创建,之间相互竞争,会导致过多占用系统资源导致系统瘫痪。

(3)不利于扩展,比如定时执行,定期执行,线程终端。

采用线程池的优点

​ (1)重用存在的线程,减少对象的创建、消亡的开销,性能佳。

​ (2)可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。

​ (3)提供定时执行、定期执行、单线程、并发数控制等功能。

Executor的介绍

其内部使用了线程池机制,它在java.util.concurrent包下,通过该框架来控制线程的启动、执行和关闭。可以简化并发编程的操作。因此,在java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理、效率更好外,还有关键的一点:有助于避免this逃逸问题(如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,这意味着该任务能够访问处于不稳定状态的对象。)

Executor框架包括:线程池、Executor,Executors、ExecutorService、CompletionService、Future、Callable等。

Executors工厂类

Executors提供四种线程池:

newFixedThreadPool、newCachedThreadPool、

newSingleThreadExecutor、 newScheduledThreadPool。

newFixedThreadPool: 创建固定数目的线程池。最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到线程池中的某个线程终止直接被移出池子。newCachedThreadPool: 创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有60秒未被使用的线程。newSingleThreadExecutor: 创建一个单线程化得Executor,相当于线程数量为1得FixedThreadPoolnewScheduledThreadPool: 创建一个支持定时以及周期性的任务执行的线程池,多数情况下可用来代替Timer类。

ExecutorService

ExcutorService继承Executor 。

Executor中只有一个方法: void execute(Runnable command); //执行任务

​ ExcutorService中的方法:

public interface ExecutorService extends Executor { void shutdown();//顺次地关闭ExecutorService,停止接收新的任务,等待所有已经提交的任务执行完毕之后,关闭ExecutorService List<Runnable> shutdownNow();//阻止等待任务启动并试图停止当前正在执行的任务,停止接收新的任务,返回处于等待的任务列表 boolean isShutdown();//判断线程池是否已经关闭 boolean isTerminated();//如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。 boolean awaitTermination(long timeout, TimeUnit unit)//等待(阻塞)直到关闭或最长等待时间或发生中断,timeout - 最长等待时间 ,unit - timeout 参数的时间单位 如果此执行程序终止,则返回 true;如果终止前超时期满,则返回 false <T> Future<T> submit(Callable<T> task);//提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。该 Future 的 get 方法在成功完成时将会返回该任务的结果。 <T> Future<T> submit(Runnable task, T result);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功完成时将会返回给定的结果。 Future<?> submit(Runnable task);//提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。该 Future 的 get 方法在成功 完成时将会返回 null <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。 throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)//执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。返回列表的所有元素的 Future.isDone() 为 true。 throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks)//执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。 throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

2. 用Executor执行Runnable任务(ExecutorService.execute())

package 多线程; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class LiftOff implements Runnable{ protected int countDown = 10; private static int taskCount = 0; private final int id = taskCount++; public LiftOff(){ } public LiftOff(int countDown){ this.countDown = countDown; } public String status(){ return "#" + id +"(" + (countDown > 0 ? countDown : "LiftOff!") +"),"; } @Override public void run() { while(countDown-- > 0){ System.out.println(status()); Thread.yield();//当前线程让出cpu资源 // try { // Thread.sleep(1000); // TimeUnit.MILLISECONDS.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0;i<2;i++){ exec.execute(new LiftOff()); } System.out.println("Waiting for LiftOff"); } }

5.1.2 实现Callable

Runnable是执行工作的独立任务,但是它不返回任何值。如果你希望任务在完成时能够返回一个值,那么可以实现Callable接口。在Java SE5中引入的Callable是一种具有类型参数的泛型,它的类型参数表示的是从方法call()(而不是run()方法) 中的返回值,并且必须使用ExecutorService.submit() 方法调用它。

1. Executor执行Callable任务(ExecutorService.submit())

package 多线程; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; class TaskWithResult implements Callable<String>{ private int id; public TaskWithResult(){ this.id = id; } @Override public String call() throws Exception { return "Result of TaskWithResult:" + id; } } public class CallableDemo { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); ArrayList<Future<String>> results = new ArrayList<Future<String>>(); for(int i = 0 ; i < 10; i++){ results.add(exec.submit(new TaskWithResult())); } for (Future<String> future : results) { try { System.out.println(future.get()); } catch (InterruptedException e) { System.out.println(e); } catch (ExecutionException e) { System.out.println(e); } } } }

​ submit()方法会产生Future对象,它用Callable返回结果的特定类型进行了参数化。你可以用isDone()方法来查询Future是否完成。当任务完成时,它具有一个结果,你可以调用 get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用 get(),在这种情况下,get()将阻塞,直至结果准备就绪。

//1.直接调用get()方法,当任务执行完后获取返回结果 future.get(); //2.在规定时间内还没获取到返回结果(任务还没执行完),则抛出异常TimeoutException future.get(1,TimeUnit.SECONDS); //3.先判断任务是否已经执行完,在获取返回结果 if(future.isDone){ future.get(); }

2. 休眠(TimeUnit.时间单位.sleep())

package 多线程; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class LiftOff implements Runnable{ protected int countDown = 10; private static int taskCount = 0; private final int id = taskCount++; public LiftOff(){ } public LiftOff(int countDown){ this.countDown = countDown; } public String status(){ return "#" + id +"(" + (countDown > 0 ? countDown : "LiftOff!") +"),"; } @Override public void run() { while(countDown-- > 0){ System.out.println(status()); //Thread.yield();//当前线程让出cpu资源 try { //Thread.sleep(1000); TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0;i<2;i++){ exec.execute(new LiftOff()); } System.out.println("Waiting for LiftOff"); } }

**TimeUnit:**TimeUnit是java.util.concurrent包下面的一个类,TimeUnit提供了可读性更好的线程暂停操作,通常用来替换Thread.sleep()。它是一个静态方法,暂停线程时它不会释放锁,该方法会抛出InterrupttedException异常(如果有线程中断了当前线程)。用Thread.sleep()不易看出程序的睡眠时间,TimeUnit允许指定sleep()延迟的时间单元,还可以被用来执行转换。

时间单位:

TimeUnit.DAYS //天 TimeUnit.HOURS //小时 TimeUnit.MINUTES //分钟 TimeUnit.SECONDS //秒 TimeUnit.MILLISECONDS //毫秒 TimeUnit.MICROSECONDS //微妙 TimeUnit.NANOSECONDS //纳秒

转换:

public long toMillis(long d) //转化成毫秒 public long toSeconds(long d) //转化成秒 public long toMinutes(long d) //转化成分钟 public long toHours(long d) //转化成小时 public long toDays(long d) //转化天

3. 优先级

每个线程均有优先级,在Thread中,与优先级对应的属性如下:

/** * The minimum priority that a thread can have. 线程的最小优先级 */ public final static int MIN_PRIORITY = 1; /** * The default priority that is assigned to a thread. 线程默认的优先级 */ public final static int NORM_PRIORITY = 5; /** * The maximum priority that a thread can have. 线程的最大优先级 */ public final static int MAX_PRIORITY = 10; package 多线程; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 3.优先级 * @author Administrator * */ public class SimplePriorities implements Runnable{ private int countDown = 5; private volatile double d; private int priority; public SimplePriorities(int priority){ this.priority = priority; } @Override public String toString() { return Thread.currentThread() + ":" +countDown; } @Override public void run() { Thread.currentThread().setPriority(priority); while(true){ for(int i = 1;i<1000;i++){ d += (Math.PI + Math.E) / (double)i; if(i % 1000 == 0){ Thread.yield(); } } System.out.println(this); if(--countDown == 0) { return; } } } public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(); for(int i = 0 ; i < 5 ; i++){ exec.execute(new SimplePriorities(Thread.MIN_PRIORITY)); } exec.execute(new SimplePriorities(Thread.MAX_PRIORITY)); exec.shutdown(); } }

4. 让步(yield())

如果知道已经完成了在run()方法的循环一次迭代过程中所需的工作,就可以给线程调度机制一个暗示:你的工作已经做得差不多了,可以让别的线程使用CPU了。这个暗示通过调用yield()方法来作出(不过这只是一个暗示,没有任何机制保证它将会被采纳)。当调用yield()时,你也是在建议具有相同优先级的其他线程可以运行。

5.后台线程(daemon)

所谓后台线程,是指在程序运行的时候在后台提供一种通用服务的线程,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中的所有后台线程。(执行main的就是一个非后台程序。)

package 多线程; import java.util.concurrent.TimeUnit; //4. 后台线程 public class SimpleDaemons implements Runnable{ @Override public void run() { try { while(true){ TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread()+" " +this); } } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { for(int i = 0 ;i < 10 ;i++){ Thread daemon = new Thread(new SimpleDaemons()); daemon.setDaemon(true);//设置成后台线程,必须在start之前设置 daemon.start(); } System.out.println("All daemons started"); TimeUnit.SECONDS.sleep(5); } }

6.ThreadFactory

根据需要创建新线程的对象,使用线程工厂就无需再手工编写 new Thread 的调用了,从而允许应用程序使用特殊的线程子类、属性等等。

package 多线程; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public class DaemonFromFactory implements Runnable{ @Override public void run() { while(true){ try { TimeUnit.MILLISECONDS.sleep(100); System.out.println(Thread.currentThread() + " " + this); } catch (InterruptedException e) { System.out.println("Interrupted"); } } } public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory()); for(int i = 0 ; i<2 ;i++){ exec.execute(new DaemonFromFactory()); } System.out.println("All daemons started"); TimeUnit.SECONDS.sleep(2); } } class DaemonThreadFactory implements ThreadFactory{ @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }

当最后一个非后台线程终止时,后台线程会“突然”终止。因此一旦mian()退出,JVM就会立即关闭所有的后台进程,而不会有任何你希望出现的确认形式。

所以:当非后台线程终止后,后台线程会立即终止,不会执行finally方法

package 多线程; import java.util.concurrent.TimeUnit; /** * 后台线程 * @author Administrator * */ public class DaemonsDontRunFinally { public static void main(String[] args) throws InterruptedException { Thread t = new Thread(new ADaemon()); t.setDaemon(true);//设置成后台程序后,finally方法不会执行 t.start(); //TimeUnit.SECONDS.sleep(5);延长非后台线程的时间,让后台程序执行完 } } class ADaemon implements Runnable{ @Override public void run() { try { System.out.println("Starting ADaemon"); TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }finally{ System.out.println("finally is run"); } } }

5.1.3 继承Thread类

package 多线程; public class SimpleThread extends Thread{ private int countDown = 5; private static int threadCount = 0; public SimpleThread(){ super(Integer.toString(++threadCount)); start(); } @Override public String toString() { return "#" + getName() +"(" + countDown + "),"; } @Override public void run() { while(true){ System.out.println(this); if(--countDown == 0){ return; } } } public static void main(String[] args) { for(int i = 0;i < 3; i++){ new SimpleThread(); } } }

1.加入一个线程(join())

一个线程可以在其他线程之上调用join()方法,其效果是等待一段时间直到第二个线程结束才继续执行。如果某个线程在另一个线程t上调用t.join(),此线程将被挂起,直到目标线程t结束才恢复(即t.isAlive()返回为假)。

也可以在调用join()时带上一个超时参数(单位可以是毫秒、微妙或纳秒),这样如果目标线程在这段时间到期时还没结束的话,join()方法总能返回。

对join()方法的调用可以被中断,做法是在调用线程上调用interrupt()方法,这时需要用到try-catch子句。

package 多线程; public class Joining { public static void main(String[] args) { Sleeper sleepy = new Sleeper("Sleepy",1500); Sleeper grumy = new Sleeper("Grumy",1500); Joiner dopey = new Joiner("Doper",sleepy); Joiner doc = new Joiner("Doc",grumy); grumy.interrupt(); } } class Sleeper extends Thread{ private int duration; public Sleeper(String name,int sleepTime){ super(name); duration = sleepTime; start(); } @Override public void run() { try { System.out.println(getName() + " is runing prepara sleeping"); sleep(1500);//sleep结束后该线程接着往下执行 System.out.println(getName() + " is runing over sleeping"); } catch (Exception e) { System.out.println(getName() + " was interrupted." + "isInterrupted():" + isInterrupted()); return; } } } class Joiner extends Thread{ private Sleeper sleeper; public Joiner(String name,Sleeper sleeper){ super(name); this.sleeper = sleeper; start(); } @Override public void run() { try { System.out.println(getName() + " is runing, sleeper is joining"); sleeper.join(); System.out.println(getName() + " is runing, sleeper is over joining"); } catch (InterruptedException e) { System.out.println("Interrupted"); } System.out.println(getName() + ":join completed"); } }

当一个线程在该线程上调用 interrupt() 时,将给该线程设定一个标志,表明该线程已经被中断。然而,异常被捕获时将清理这个标志,所以在catch子句中,在异常被捕获的时候这个标志总是为假。

注意:Java SE5的java.util.concurrent类库包含诸如CyclicBarrier这样的工具,它们可能比最初的线程类库中的join()更加适合。

2.捕获异常

在Java多线程程序中,所有线程都不允许抛出未捕获的checked Exception,也就是说各个线程需要自己把自己的checked Exception处理掉。

但是线程依然有可能抛出unchecked Exception(如运行时异常),当此类异常抛出,线程就会终结,而对于主线程或其他线程完全不受影响,且完全感知不到某个线程抛出的异常(也就是无法catch到)。

package 多线程; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExceptionThread implements Runnable{ public void run(){ throw new RuntimeException(); } public static void main(String[] args) { try{ ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ExceptionThread()); }catch(Exception e){ System.out.println(e); } } }

为了解决这个问题:我们需要修改Executor产生线程的方式。Thread.UncaughtExceptionHandler是java SE5中的新接口,它允许你在每个Thread对象上都附着一个异常处理器。uncaughtExeception()方法会在线程因为捕获的异常而临近死亡时被调用。

package 多线程; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; //会抛出异常的线程 class ExceptionThread2 implements Runnable{ @Override public void run() { Thread t = Thread.currentThread(); System.out.println("run () by " + t); System.out.println("eh = " + t.getUncaughtExceptionHandler()); throw new RuntimeException(); } } //对线程中的异常做处理 class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{ @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("caught:" + e); } } //对线程的创建做封装 class HandlerThreadFactory implements ThreadFactory{ @Override public Thread newThread(Runnable r) { System.out.println(this + " creating new Thread"); Thread t = new Thread(r); System.out.println("created " + t); //设置未捕获到的异常处理器 t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); System.out.println("eh = " + t.getUncaughtExceptionHandler()); return t; } } public class CaptureUncaughtException { public static void main(String[] args) { ExecutorService exec = Executors.newCachedThreadPool(new HandlerThreadFactory()); exec.execute(new ExceptionThread2()); //Thread t = new Thread(new ExceptionThread2()); //为该线程设置异常处理器 //t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); //为所有的线程都设置一个全局的异常处理器 // Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler()); // System.out.println(1/0); // t.start(); } }

这个处理器只有在不存在线程专有的未捕获异常处理器的情况下才会被调用。系统会检查线程专有版本,如果没有发现,则检查线程组是否有专有的uncaughtException()方法,如果也没有,再调用defaultUncaughtExceprion()方法。

5.2 线程同步

不正确的资源访问

package _01.不正确的资源访问; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; abstract class IntGenerator{ private volatile boolean canceled = false; public abstract int next(); public void cancel(){ canceled = true; } public boolean isCanceled(){ return canceled; } } public class EvenGenerator extends IntGenerator{ private int currentEventValue = 0; @Override public int next() { ++currentEventValue; //Thread.yield(); ++currentEventValue; return currentEventValue; } public static void main(String[] args) { EvenChecker.test(new EvenGenerator()); } } class EvenChecker implements Runnable{ private IntGenerator generator; private final int id; public EvenChecker(IntGenerator generator, int id) { super(); this.generator = generator; this.id = id; } @Override public void run() { while(!generator.isCanceled()){//canceled = false; int val = generator.next(); if(val % 2 != 0){ System.out.println(val + " not even!"); generator.cancel();//canceled = true; } } } public static void test(IntGenerator gp,int count){ System.out.println("Press Control-C to exit"); ExecutorService pool = Executors.newCachedThreadPool(); for(int i = 0; i < count;i++){ pool.execute(new EvenChecker(gp,i)); } pool.shutdown(); } public static void test(IntGenerator gp){ test(gp,10); } }

线程同步:

线程互斥:

线程的状态:

新建(new):当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必需的系统资源,并执行了初始化。此刻线程已经有资格获得CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。就绪(Runnable):在这种状态下,只要调度器把时间片分配给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。只要调度器能分配时间片给线程,它就可以运行,这不同于死亡和阻塞状态。阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器将忽略此线程,不会分配给线程任何CPU时间。直到线程重新进入了就绪状态,它才有可能执行操作。死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已经结束,或不再是可运行的。任务死亡的通常方式是从 run()方法返回,但是任务的线程还可以被中断。

一个任务进入阻塞状态,可能有如下原因:

(1)通过调用sleep() 使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。

(2) 你通过调用 wait() 使线程挂起。直到线程得到了 notify() 、notifyAll()消息(或者在java SE5的java.util.concurrent 类库中等价的signal() 、signalAll() 消息。),线程才会进入就绪状态。

(3)任务在等待某个输入/输出完成。

(4)任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。

5.2.1 synchronized方法

​ 如果某个任务处于一个对标记为synchronized 的方法的调用中,那么在这个线程从该方法返回之前,其他所有要调用类中任何标记为 synchronized方法 的线程都会被阻塞。

​ 所有对象都自动含有单一的锁(也称为监视器)。当在对象上调用其任意synchronized方法的时候,此对象都被加锁,这时该对象上的其他synchronized方法只有等到前一个方法调用完毕并释放了锁之后才能被调用。

所以,对于某个特定对象来说,其所有synchronized方法共享同一个锁,这可以被用来防止多个任务同时访问被编码为对象内存。

针对每个类,也有一个锁(作为类的Class对象的一部分),所以synchronized static 方法可以在类的范围内防止对static数据的并发访问。

public class SynchronizedEvenGenerator extends IntGenerator{ private int currentEventValue = 0; @Override public synchronized int next() { ++currentEventValue; Thread.yield(); ++currentEventValue; return currentEventValue; } public static void main(String[] args) { EvenChecker.test(new SynchronizedEvenGenerator()); } }

5.2.2 使用显示的Lock对象

​ Java SE5的java.util.concurrent类库还包含有定义在 java.util.concurrent.locks中显示的互斥机制。Lock对象必须被显示地创建、锁定和释放。

//创建一个ReentrantLock实例 Lock lock = new ReentrantLock(); //锁定 lock.lock(); //......被锁资源 //释放 lock.unlock(); public class MutexEvenGenerator extends IntGenerator{ private int currentEventValue = 0; private Lock lock = new ReentrantLock(); @Override public int next() { lock.lock(); try{ ++currentEventValue; Thread.yield(); ++currentEventValue; return currentEventValue; }finally{ lock.unlock(); } } public static void main(String[] args) { EvenChecker.test(new MutexEvenGenerator()); } }

5.2.3 使用特殊域变量(volatile)实现线程同步

5.2.3.1 java内存模型(JMM)

​ Java 内存模型的主要目标是 定义程序中各个变量的访问规则,即在虚拟机中 将变量存储到内存 和 从内存中取出变量 这样的底层细节。此处的变量与 Java 变成中所说的变量有所区别,它包括了实例变量、静态变量和构成数组对象的元素,但不包括局部变量与方法参数,因为后者是线程私有的,不会被共享,自然就不会存在竞争问题。

JMM 就作用于工作内存和主存之间数据同步过程。它规定了如何做数据同步以及什么时候做数据同步。

Java内存模型规定了所有的变量都存储在主内存中(堆)中,此外每条线程还要自己的工作内存(栈)。 线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,不能直接读写到主内存中的变量。 并且,不同的线程之间也无法访问对方工作内存中的变量,线程间变量值的传递需要通过主内存来。 线程、主内存、工作内存关系如下图:

[外链图片转存失败(img-1PFflDiv-1565488053180)(E:\TyporaIImages\1564196963750.png)]

​ 关于 主内存 和 工作内存 之间具体的交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步回主内存之类的实现细节,Java 内存模型中定义了一下 8 种操作来完成,虚拟机实现时必须保证下面提及的每一种操作都是原子的、不可再分的(对于 double 和 long 类型的变量来说,在某些平台上允许有例外,这个问题将在下文中说明)。

lock(锁定):作用于主内存的变量,它把一个变量标识为一条线程独占的状态。unlock(解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后变量才可以被其他线程锁定。read(读取):作用于主内存的变量,它把一个变量的值从主内存传输到工作内存中,以便随后的 load 动作使用。load(载入):作用于工作内存的变量,它把 read 操作从主内存得到的变量值保存到工作内存的变量副本中。use(使用):作用于工作内存的变量,它把工作内存中的一个变量的值传递给执行引擎(一般是基于操作数栈的执行引擎),每当虚拟机遇到一个需要使用到该变量的值的 字节码指令时将会执行这个操作。assign(赋值):作用于工作内存的变量,它把从执行引擎接收到的值赋值给工作内存的变量(存放在局部变量表中),每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。store(存储):作用于工作内存的变量,它把工作内存中的变量传送到主内存中,以便随后的 write 操作使用。write(写入):作用于主内存的变量,它把 store 操作从工作内存中得到的变量的值方法主内存的变量中。

链接:https://www.jianshu.com/p/9e02547b0873

[外链图片转存失败(img-T2m0WaX8-1565488053181)(E:\TyporaIImages\1564198177250.png)]

如果对一个变量进行 lock 操作,那么将会清空工作内存中这个变量的值,在执行引擎使用这个变量前,需要重新执行 load 或 assign 操作初始化变量的值。

​ 如果一个变量事先没有被 lock 操作锁定,那就不允许对它执行 unlock 操作,也不允许去 unlock 一个被其他线程锁定住的变量。

对一个变量执行 unlock 之前,必须先把此变量同步回主内存中(执行 store 以及 write 操作)。

5.2.3.2 Java 内存模型中的可见性、原子性和有序性。

可见性

​ 可见性是一种复杂的属性,因为可见性中的错误总是会违背我们的直觉。通常,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事。为了确保多个线程之间对内存写入操作的可见性,必须使用同步机制。

​ **可见性。是指线程之间的可见性,一个线程修改的状态对另一个线程是可见的。**也就是一个线程修改的结果,另一个线程马上就能看到。比如:**用volatile修饰的变量,就会具有可见性。**volatile 的特殊规则保证了新值能立即同步到主内存,以及每次使用前立即从主内存刷新。除了 volatile 之外,Java 还有两个关键字能实现可见性,即 synchronized 和 final。同步也会导致向主存中刷新,因此如果一个域完全由 synchronized方法或语句块来防护,那就不必将其设置为是volatile的。同步块的可见性是由“对一个变量执行 unlock 操作之前,必须先把工作内存中此变量的值同步回主内存中(执行 store、write 操作)”这条规则获得的,而 final关键字的可见性是指:被 final 修饰的字段在构造器中一旦初始化完成,并且构造器没有把 “this” 引用传递出去,那在其他线程中就能看见 final 字段的值。

原子性

​ 原子性可以应用于除long、double之外的其它基本类型之上的 ”简单操作“。我们大致可以认为 基本数据类型的访问、读写是具备原子性的(例外就是 long 和 double 的非原子协定)。

​ 比如 a=0;(a非long和double类型) 这个操作是不可分割的,那么我们说这个操作时原子操作。再比如:a++; 这个操作实际是a = a + 1;是可分割的,所以他不是一个原子操作。非原子操作都会存在线程安全问题,需要我们使用同步技术(sychronized)来让它变成一个原子操作。一个操作是原子操作,那么我们称它具有原子性。java的concurrent包下提供了一些原子类,我们可以通过阅读API来了解这些原子类的用法。比如:AtomicInteger、AtomicLong、AtomicReference等。

也就是说 Java 内存模型中的 read、load、use、assign、store 和 wriet 这 6 种操作可以保证 基本数据类型 的原子性,而 Java 内存模型定义的 lock 和 unlock 操作则可以保证 基本数据类型 以外的数据类型的原子性。

有序性

Java语言提供了 volatile 和 synchronized(Lock) 关键字来保证线程之间操作的有序性,volatile 是因为其本身包含“禁止指令重排序”的语义,synchronized 是由“一个变量在同一个时刻只允许一条线程对其进行 lock 操作”这条规则获得的,此规则决定了持有同一个对象锁的两个同步块只能串行执行。

5.2.3.3 volatile变量具有的特性

当一个变量定义为volatile之后,它将具备两种特性:保证变量的可见性和禁止指令重排序优化。

保证此变量对所有的线程的可见性:当一个线程修改了这个变量的值,volatile保证了新值立即能同步到主内存,以及每次使用前立即从主内存刷新。但普通变量做不到这一点,普通变量的值在线程间传递需要通过主内存来完成。

​ 关于 volatile 变量的可见性,写操作在并发下是安全的,但是 volatile 变量的运算在并发下不是安全 的,因为 Java 里面的运算并非原子操作。

禁止指令重排序优化:普通的变量仅仅会保证在该方法的执行过程中所有依赖赋值结果的地方都能获取到正确的结果,而不能保证变量赋值操作的顺序与程序代码中的执行顺序一致。

​ 程序执行的顺序按照代码的先后顺序执行,禁止进行指令重排序。看似理所当然的事情,其实并不是这样,指令重排序是JVM为了优化指令,提高程序运行效率,在不影响单线程程序执行结果的前提下,尽可能地提高并行度。但是在多线程环境下,有些代码的顺序改变,有可能引发逻辑上的不正确。

5.2.3.4 使用volatile必须满足的两个条件。

​ 通过关键字synchronized可以防止多个程序进入同一段代码,在某些特定场景中,volatile相当于一个轻量级的synchronized,因为不会引起线程的上下围切换。

但是使用volatile必须满足两个条件:

 1、一个域的值不依赖它之前的值。如多线程下执行a++,是无法通过volatile保证结果准确性的;

2、该域的值不受其他域的值的限制(该变量没有包含在具有其他变量的不变式中)。

原因:

a++,不是一个原子操作,它的操作实际上分为:读取-修改-写入。因此在读取和写入之间,另一个任务可能会修改这个域。由volatile修饰的变量的操作必须是以原子方式执行,才能保证数据的正确性。

volatile变量不能用于约束条件中,反例如下:

public class NumberRange {   private volatile int lower = 0;   private volatile int upper = 10;   public int getLower() { return lower; }   public int getUpper() { return upper; }   public void setLower(int value) {   if (value > upper)   throw new IllegalArgumentException(...);   lower = value;   }   public void setUpper(int value) {   if (value < lower)   throw new IllegalArgumentException(...);   upper = value;   }   }

上述代码中,上下界初始化分别为0和10,假设线程A和B在某一时刻同时执行了setLower(8)和setUpper(5),且都通过了不变式的检查,设置了一个无效范围(8,5),所以在这种场景下,需要通过sychronize保证方法setLower和setUpper在每一时刻只有一个线程能够执行。

5.2.4 synchronized同步控制块

synchronized(this){ } synchronized(object){ }

5.3 线程通讯(线程之间的协作)

5.3.1 借助于Object类的wait()、notify()和notifyAll()实现通信

1wait()notify()notifyAll()方法是Object的方法。 (2wait() 会在等待外部世界产生变化的时候将任务挂起,并且只有在notify()notifyAll()发生时,这个任务才会被唤醒。 (3)调用 sleep() 的时候锁并没有被释放,调用 yield() 也属于这种情况。当一个任务在方法里遇到了对 wait() 的调用的时候,线程的执行被挂起,对象上的锁被释放,这就意味着另一个任务可以获得这个锁,因此在该对象中的其他 synchronized 方法可以在 wait() 期间被调用。 (4)只能在同步控制方法或同步控制块里调用 wait()notify()notifyAll()(因为不用操作锁,所以sleep()可以在非同步控制方法里调用。)。如果在非同步控制方法里面调用这些方法,程序能通过编译,但运行的时候,将得到 IllegalMonitorStateException异常。调用 wait()notify()notifyAll()的任务在调用这些方法之前必须获取对象的锁。 (5)在使用 wait() 之前必须对象的锁,而在使用notify()notifyAll()的时候,它只会唤醒具有相同锁的wait() 线程(只有等待这个锁的任务才会唤醒)。等待和唤醒必须是同一锁下的线程

总结:一个任务在调用 wait() 之前必须获取对象的锁,调用 wait()之后,线程进入阻塞状态并释放对象的锁,在使用 notify()、notifyAll() 的时候,只能唤醒在等待这个锁的任务。

对sleep、yield、join、wait的总结:

(1)sleep-线程睡眠 :Thread的静态方法,可以再非同步方法中调用,调用sleep方法后线程并不会释放锁,进入阻塞状态,睡眠时间到后进入就绪状态。

(2)yield-线程让步:Thread的静态方法,调用yield方法的线程让出CPU资源后直接进入就绪状态,与优先级相同的线程一起竞争CPU资源。

(3)join-线程合并:调用join的线程会进入wait,等待被调用(加入)的线程执行完后再唤醒该线程。

(4)wait-线程同步:Object的方法,必须在同步方法中调用,调用wait方法后线程释放锁,进入阻塞状态,等待同一个锁的notify、notifyAll唤醒。

单个生产者与单个消费者

package _05生产者与消费者; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 单个生产者,单个消费者。 */ /** * 饭店:饭店有厨师和服务员以及饭菜。厨师烧好一份饭菜通知服务员上菜,服务员接受订单通知厨师烧。 * @author Administrator * */ public class Restaurant { Meal meal; ExecutorService exec = Executors.newCachedThreadPool(); //厨师和服务员为两个线程 WaitPerson waitPerson = new WaitPerson(this); Chef chef = new Chef(this); public Restaurant(){ exec.execute(chef); exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); } } class Meal{ private final int orderNum; public Meal(int orderNum){ this.orderNum = orderNum; } public String toString(){ return "Meal " + orderNum; } } //消费者:服务员 class WaitPerson implements Runnable{ private Restaurant restaurant; public WaitPerson(Restaurant r){ restaurant = r; } @Override public void run() { try { while(!Thread.interrupted()){ synchronized(this){ while(restaurant.meal == null){ wait();//当前对象(Restaurant 中的waitPerson)等待,并释放锁,以便再notify的时候获取该对象的锁对其唤醒。 } //等待chef唤醒该对象上的线程后,开始上菜,之后唤醒厨师烧菜。 System.out.println("WaitPerson got " + restaurant.meal); synchronized(restaurant.chef){ restaurant.meal = null; restaurant.chef.notifyAll();//唤醒厨师 } } } } catch (InterruptedException e) { System.out.println("WaitPerson is interrupted!"); } } } //生产者:厨师 class Chef implements Runnable{ private Restaurant restaurant; private int count = 0; public Chef(Restaurant r){ restaurant = r; } @Override public void run() { while(!Thread.interrupted()){ try { synchronized(this){ while(restaurant.meal != null){ wait();//等待服务员的唤醒 } if(++count == 10){ System.out.println("Out of food.closing"); //立即中断线程池中所有的线程,向线程池中所有的线程发送interrupt() //服务员将chef唤醒后进入挂起状态,此时被中断,抛出异常 //而厨师在接受到中断时,并没有进入阻塞状态,所以不会被立即中断,当线程chef试图调用sleep()时,抛出异常。 //如果没有sleep语句,线程将不会抛出异常,去检查最外面的while循环,条件为false退出循环。 restaurant.exec.shutdownNow(); } System.out.println("order up!"); synchronized(restaurant.waitPerson){ restaurant.meal = new Meal(count); restaurant.waitPerson .notifyAll(); } TimeUnit.MILLISECONDS.sleep(100); } } catch (InterruptedException e) { System.out.println(Thread.interrupted() + " Chef is interrupted!"); } } } }

5.3.2 使用显示的Lock和Condition对象。

将同步synchronized替换为显示的Lock操作。

将Object类中的 wait()、notify()、notifyAll()替换成了Condition对象(await()、signal() 和 signalAll()),该对象可以通过Lock锁对象获取。

一个Lock对象上可以绑定多个Condition对象。要为特定的Lock实例获得Condition实例,请使用其 newCondition()方法。

语法

Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); ... lock.lock(); try{ condition.await(); condition.signal(); condition.signalAll(); }finally{ lock.unclock(); } } package _06.使用显示的Lock和Condition对象; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class WaxOnMatic2 { public static void main(String[] args) throws InterruptedException { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); //抛光和涂蜡都是对同一个 对象car,所以可以实现wait和notifyAll //抛光 exec.execute(new WaxOff(car)); //涂蜡 exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } } class Car{ private Lock lock = new ReentrantLock(); //任务在可以调用await()、signal()、signalAll()之前,必须拥有锁对象。 private Condition condition = lock.newCondition(); //涂蜡、抛光的处理状态 private boolean waxOn = false; //将车涂蜡,唤醒当前对象的线程去抛光。 public void waxed(){ lock.lock(); try{ waxOn = true; condition.signalAll(); }finally{ lock.unlock(); } } //将车抛光,唤醒当前对象的线程去涂蜡。 public void buffed(){ lock.lock(); try{ waxOn = false; condition.signalAll(); }finally{ lock.unlock(); } } //车已经抛光,线程等待其被涂蜡 public void waitForWaxing() throws InterruptedException{ lock.lock(); try{ while(waxOn == false){ condition.await(); } }finally{ lock.unlock(); } } //车已涂蜡,线程等待其被抛光 public void waitForBuffing() throws InterruptedException{ lock.lock(); try{ while(waxOn == true){ condition.await(); } }finally{ lock.unlock(); } } } //涂蜡 class WaxOn implements Runnable{ private Car car; public WaxOn(Car car) { this.car = car; } @Override public void run() { try { while(!Thread.interrupted()){ System.out.println("Wax on! "); TimeUnit.SECONDS.sleep(1); //涂蜡,并唤醒正在等待的线程 car.waxed(); //车已涂蜡,该线程等待 car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("Wax on is Interrupted ! "); } System.out.println("Ending Wax on task !"); } } //抛光 class WaxOff implements Runnable{ private Car car; public WaxOff(Car c) { this.car = c; } @Override public void run() { try { while(!Thread.interrupted()){ //判断是否已经抛光,如果已经抛光则等待 car.waitForWaxing(); System.out.println("Wax Off!"); TimeUnit.SECONDS.sleep(1); //抛光,并唤醒正在等待的线程 car.buffed(); } } catch (InterruptedException e) { System.out.println("Wax Off is interrupted !"); } System.out.println("Ending Wax Off task !"); } } 生产者-消费者与队列

5.3.3 使用阻塞队列(BlockingQueue)控制线程通信

​ wait() 和 notifyAll()方法以一个非常低级的方式解决了任务互操作问题,即每次交互时都握手。在许多情况下,你可以瞄向更高的抽象级别,使用同步队列 来解决任务协作问题,同步队列在任何时刻都只允许一个任务插入或移除元素。 在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。你通常可以使用 LinkedBlockingQueue,它是一个无界队列,还可以使用ArrayBlockingQueue,它具有固定的尺寸,因此你可以在它被阻塞之前,向其中放置有限数量的元素。

BlockingQueue的核心方法:

public interface BlockingQueue<E> extends Queue<E> { //将给定元素设置到队列中,如果设置成功返回true, 否则返回false。如果是往限定了长度的队列中设置值,推荐使用offer()方法。 boolean add(E e); //将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。 boolean offer(E e); //将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。 void put(E e) throws InterruptedException; //将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。 E take() throws InterruptedException; //在给定的时间里,从队列中获取值,时间到了直接调用普通的poll方法,为null则直接返回null。 E poll(long timeout, TimeUnit unit) throws InterruptedException; //获取队列中剩余的空间。 int remainingCapacity(); //从队列中移除指定的值。 boolean remove(Object o); //判断队列中是否拥有该值。 public boolean contains(Object o); //将队列中值,全部移除,并发设置到给定的集合中。 int drainTo(Collection<? super E> c); //指定最多数量限制将队列中值,全部移除,并发设置到给定的集合中。 int drainTo(Collection<? super E> c, int maxElements); }

BlockingQueue 的实现类(待学习…)

ArrayBlockingQueue:LinkedBlockingQueue:DelayQueue:SynchronousQueue: package _07.生产者与消费者队列; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class BlockingQueueTest { public static void main(String[] args) throws Exception { //声明一个容量为10的缓存队列 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(4); //new了三个生产者和一个消费者 Producer producer1 = new Producer(queue); // Producer producer2 = new Producer(queue); // Producer producer3 = new Producer(queue); Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Thread thread = new Thread(producer1); //借助Executors ExecutorService exec = Executors.newCachedThreadPool(); //启动线程 exec.execute(producer1); //exec.execute(producer2); //exec.execute(producer3); exec.execute(consumer1); exec.execute(consumer2); //执行5s TimeUnit.SECONDS.sleep(3); //退出Executor,关闭线程 exec.shutdownNow(); } } //生产者 class Producer implements Runnable{ //是否在运行的标志 private volatile boolean isRunning = true; //阻塞队列 private BlockingQueue<String> queue; //自动更新的值(原子类,提供原子操作) private static AtomicInteger count = new AtomicInteger(); private static final int DEFAULT_RANG_FOR_SLEEP = 1000; //构造函数 public Producer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { String data = null; Random r = new Random(); System.out.println("启动生产者线程:" + Thread.currentThread()); try { while(isRunning){ //去0~DEFAULT_RANG_FOR_SLEEP值的一个随机数 Thread.sleep(r.nextInt(DEFAULT_RANG_FOR_SLEEP)); //以原子方式将count当前值加1 data = "data:" + count.incrementAndGet(); System.out.println(Thread.currentThread() +" 将数据:" + data + "放入队列..."); //设定等待的时间为2s,如果超过2s还没加进去返回true if(!queue.offer(data, 2, TimeUnit.SECONDS)){ System.out.println("放入数据失败:" + data); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); }finally{ System.out.println(Thread.currentThread() + " 退出生产者线程!"); } } public void stop(){ isRunning = false; } } class Consumer implements Runnable{ private BlockingQueue<String> queue; private static final int DEFAULT_RANG_FOR_SLEEP = 1000; public Consumer(BlockingQueue<String> queue){ this.queue = queue; } @Override public void run() { System.out.println("启动消费者线程:" + Thread.currentThread()); Random r = new Random(); boolean isRunning = true; try { while(isRunning){ //有数据时直接从队列的队首取走,无数据时阻塞,在2s内有数据,取走,超过2s还没数据,返回失败 String data = queue.poll(2, TimeUnit.SECONDS); if(data != null){ System.out.println(Thread.currentThread() + " 拿到并消费数据:" + data); Thread.sleep(r.nextInt(DEFAULT_RANG_FOR_SLEEP)); }else{ //超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。 isRunning = false; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); }finally{ System.out.println(Thread.currentThread() + " 退出消费者线程!"); } } }

5.4 死锁

当一下四个条件同时满足时,就会发生死锁。

(1)互斥条件: 任务使用的资源中至少有一个是不能共享的。

(2)请求与保持条件: 已经得到资源的进程可以申请新的资源。

(3)非剥夺条件: 已经分配的资源不能从相应的进程中被抢夺。资源不能被抢占。

(4)循环等待条件: 一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源。(我等你,你等我,两支筷子:你我一人一支,我必须得到你的筷子才能吃,你也一样,否则不能继续下去,只能等待。)

5.5 新类库中的构件

ThreadLocal类


最新回复(0)