并发编程基础、同步容器、线程池

it2022-05-05  145

文章目录

基础关键字synchronizedvolatile 相关类AtomicXXXCountDownLatchReentrantLockThreadLocal 实例1 监听容器数量2 生产者 消费者 同步容器Map/SetConcurrentHashMap/ConcurrentHshSetConcurrentSkipListMap/ConcurrentSkipListSet ListCopyOnWriteArrayList QueueConcurrentLinkedQueueLinkedBlockingQueueArrayBlockingQueueDelayQueueLinkedTransferQueueSynchronusQueue 线程池相关类/接口线程池状态FixedThreadPoolCashedThreadPoolScheduledThreadPoolSingleThreadExecutorForkJoinPool*ThreadPoolExecutor

基础

关键字

synchronized

锁的是对象。可能锁对象包括: this, 临界资源对象, Class 类对象。 synchronized(this)和synchronized方法都是锁当前对象。静态同步方法,锁的是当前类型的类对象。 加锁的目的: 就是为了保证操作的原子性。同步方法只影响锁定同一个锁对象的同步方法。不影响其他线程调用非同步方法,或调用其他锁资源的同步方法。同步方法只能保证当前方法的原子性,不能保证多个业务方法之间的互相访问的原子性。注意在商业开发中,多方法要求结果访问原子操作,需要多个方法都加锁,且锁定同一个资源。同一个线程,多次调用同步代码,锁定同一个锁对象,可重入。在不同线程中,不可重入。子类同步方法覆盖父类同步方法,可以指定调用父类的同步方法。相当于锁的重入。在同步方法中发生异常的时候,自动施放锁资源,不会影响其他线程执行。(在同步业务逻辑中,在catch中处理)锁,锁的是对象,不是引用。(后 重置对象,不会影响线程) 同步代码一旦加锁后,那么会有一个临时的锁引用执行锁对象,和真实的引用无直接关联在锁未释放前,修改锁对象引用,不会影响同步代码的执行

volatile

通知OS操作系统底层,在CPU计算过程中,都要检查内存中数据的有效性,保证最新内存数据被使用。保证可见性,但不能保证原子性。

相关类

AtomicXXX

比如AtomicInteger/AtomicBoolean/AtomicIntegerArray/AtomicLong… 原子操作类型,其中的每个方法都是原子操作,可以保证原子操作,可以保证线程安全

AtomicInteger count = new AtomicInteger(0);

CountDownLatch

可以和锁混合使用或替代锁的功能,在门闩未完全开放前等待,当门闩完全开方后执行,避免锁的效率底下问题。

CountDownLatch latch = new CountDownLatch(5); // 创建5个门闩latch.await(); // 等待门闩开方latch.countDown(); // 减门闩上的锁

将门 闩减完后执行

CountDownLatch latch = new CountDownLatch(5); void m1(){ try { latch.await();// 等待门闩开放。 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m1() method"); } void m2(){ for(int i = 0; i < 10; i++){ if(latch.getCount() != 0){ System.out.println("latch count : " + latch.getCount()); latch.countDown(); // 减门闩上的锁。 } try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("m2() method : " + i); } } public static void main(String[] args) { final Test_15 t = new Test_15(); new Thread(new Runnable() { @Override public void run() { t.m1(); } }).start(); new Thread(new Runnable() { @Override public void run() { t.m2(); } }).start()

ReentrantLock

重入锁,建议应用的同步方式,相对效率比synchronized高,量级较轻。

Lock lock = new ReentranLock(); 创建 lock.lock(); 加锁 lock.unlock(); **解锁 放在finally中,**手动释放所标记 lock.tryLock(); 尝试锁,如果有锁,无法获取锁标记,返回false;如果获取锁标记,返回true。尝试锁在解除锁标记( lock.unlock())的时候,一定要判断是否获取到锁标记。如果当前线程没有获取到锁标记,会抛出异常。 lock.lockInterruptibly(); 尝试打断,类似于睡眠唤醒,阻塞等待锁,可以被其他的线程打断阻塞状态。 Condition condition= lock.newCondition(); 为Lock增加条件,当条件满足时,做什么事情,如加锁或解锁。condition.await(); 释放当前线程占用的锁,并阻塞当前线程,等待唤醒 condition.signalAll(); 唤醒

阻塞状态

普通阻塞 sleep(100000),可以被打断。调用thread.interrupt(),可以打断阻塞状态,抛出异常。 等待队列 wait() 被调用,也是一种阻塞状态,只能有notify唤醒。无法打断。 锁池队列 无法获取锁标记。不是所有的锁池队列都可被打断。 使用ReenrantLock的lock方法,获取锁标记的守护,如果需要阻塞等待锁标记,无法被打断。使用ReenrantLock的lockInterruptibly方法,获取锁标记的时候,如果需要阻塞等待,可以被打断。

代替synchronized

Lock lock = new ReentrantLock(); void m1(){ try{ lock.lock(); // 加锁 for(int i = 0; i < 10; i++){ TimeUnit.SECONDS.sleep(1); System.out.println("m1() method " + i); } }catch(InterruptedException e){ e.printStackTrace(); }finally{ lock.unlock(); // 解锁 } } void m2(){ lock.lock(); System.out.println("m2() method"); lock.unlock(); } public static void main(String[] args) { final Test_01 t = new Test_01(); new Thread(new Runnable() { @Override public void run() { t.m1(); } }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new Runnable() { @Override public void run() { t.m2(); } }).start(); }

公平锁*private static *ReentrantLock *lock *= *new *ReentrantLock(true);

ThreadLocal

在多线程环境中,为每一个线程保存一份线程变量的工具 https://www.cnblogs.com/dolphin0520/p/3920407.html

public T get()public void set(T value)public void remove()protected T initialValue()

使用ThreadLocal的时候,一定注意回收资源问题,每个线程结束之前,将当前线程保存的线程变量一定要删除。ThreadLoacl.remove() 避免内存泄漏

实例

1 监听容器数量

题目 自定义容器,提供新增元素(add)和获取元素数量(size)方法。 启动两个线程。线程1向容器中新增10个数据。线程2监听容器元素数量,当容器元素数量为5时,线程2输出信息并终止。

使用volatile,保证t的可见性

public static void main(String[] args) { final volatile List<Object> t = new ArrayList<>(); final Object lock = new Object(); new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 10; i++){ System.out.println("add Object to Container " + i); t.add(new Object()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable(){ @Override public void run() { while(true){ if(t.size() == 5){ System.out.println("size = 5"); break; } } } }).start(); }

使用wait和notify wait是指在一个已经进入了同步锁的线程内,让自己暂时让出同步锁*,以便其他正在等待此锁的线程可以得到同步锁并运行,只有其他线程调用了notify方法(notify并不释放锁,只是告诉调用过wait方法的线程可以去参与获得锁的竞争了),调用wait方法的一个或多个线程就会解除wait状态,重新参与竞争对象锁,程序如果可以再次得到锁,就可以继续向下运行。

public static void main(String[] args) { final volatile List<Object> t = new ArrayList<>(); final Object lock = new Object(); new Thread(new Runnable(){ @Override public void run() { synchronized (lock) { if(t.size() != 5){ try { lock.wait(); // 线程进入等待队列。 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("size = 5"); lock.notifyAll(); // 唤醒其他等待线程 } } }).start(); new Thread(new Runnable() { @Override public void run() { synchronized (lock) { for(int i = 0; i < 10; i++){ System.out.println("add Object to Container " + i); t.add(new Object()); if(t.size() == 5){ lock.notifyAll(); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); }

*使用CountDownLatch 门闩

public static void main(String[] args) { final Container t = new Container(); final CountDownLatch latch = new CountDownLatch(1); new Thread(new Runnable(){ @Override public void run() { if(t.size() != 5){ try { latch.await(); // 等待门闩的开放。 不是进入等待队列 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("size = 5"); } }).start(); new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 10; i++){ System.out.println("add Object to Container " + i); t.add(new Object()); if(t.size() == 5){ latch.countDown(); // 门闩-1 } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); }

2 生产者 消费者

题目 自定义同步容器,容器容量上限为10。可以在多线程中应用,并保证数据线程安全。

wait、notify 都是和while 配合应用的。可以避免多线程并发判断逻辑失效的问题。 使用synchronized

public class TestContainer01<E> { private final LinkedList<E> list = new LinkedList<>(); private final int MAX = 10; private int count = 0; public synchronized int getCount(){ return count; } public synchronized void put(E e){ while(list.size() == MAX){ try { this.wait(); } catch (InterruptedException e1) { e1.printStackTrace(); } } list.add(e); count++; this.notifyAll(); } public synchronized E get(){ E e = null; while(list.size() == 0){ try{ this.wait(); } catch (InterruptedException e1) { e1.printStackTrace(); } } e = list.removeFirst(); count--; this.notifyAll(); return e; } public static void main(String[] args) { 。。。 }

}

*使用ReentrantLock

public class TestContainer02<E> { private final LinkedList<E> list = new LinkedList<>(); private final int MAX = 10; private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public int getCount(){ return count; } public void put(E e){ lock.lock(); try { while(list.size() == MAX){ System.out.println(Thread.currentThread().getName() + " 等待。。。"); // 进入等待队列。释放锁标记。 // 借助条件,进入的等待队列。 producer.await(); } System.out.println(Thread.currentThread().getName() + " put 。。。"); list.add(e); count++; // 借助条件,唤醒所有的消费者。 consumer.signalAll(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { lock.unlock(); } } public E get(){ E e = null; lock.lock(); try { while(list.size() == 0){ System.out.println(Thread.currentThread().getName() + " 等待。。。"); // 借助条件,消费者进入等待队列 consumer.await(); } System.out.println(Thread.currentThread().getName() + " get 。。。"); e = list.removeFirst(); count--; // 借助条件,唤醒所有的生产者 producer.signalAll(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { lock.unlock(); } return e; } public static void main(String[] args) { 。。。 } }

同步容器

解决并发情况下的容器线程安全问题的。给多线程准备一个线程安全的容器对象。 线程安全的容器对象,都是使用synchronized方法实现,类似native,java8使用CAS

VecrorHashtable等

concurrent包中的同步容器,大多是使用系统底层技术实现的线程安全。

Map/Set

ConcurrentHashMap/ConcurrentHshSet

底层哈希实现的同步 Map(Set)。效率高,线程安全。使用系统底层技术实现线程安全。量级较 synchronized 低。 key 和 value 不能为 null。

ConcurrentSkipListMap/ConcurrentSkipListSet

底层跳表(SkipList) 实现的同步 Map(Set)。有序,效率比 ConcurrentHashMap 稍低。

List

CopyOnWriteArrayList

写时复制集合。写入效率低,读取效率高。每次写入数据,都会创建一个新的底层数组。 牺牲空间实现线程安全。 适用于写少读多的情况。

Queue

ConcurrentLinkedQueue

基础链表同步队列。

LinkedBlockingQueue

阻塞队列,队列容量不足自动阻塞,队列容量为 0 自动阻塞。

put 自动阻塞,队列容量满后,自动阻塞take 自动阻塞方法,队列容量为0后,自动阻塞

ArrayBlockingQueue

底层数组实现的有界队列。 自动阻塞。根据调用 API(add/put/offer) 不同,有不同特性。 当容量不足的时候,有阻塞能力。

add 方法在容量不足的时候,抛出异常。put 方法在容量不足的时候,阻塞等待。offer 方法, 单参数 offer 方法,不阻塞。容量不足的时候,返回 false。当前新增数据操作放弃。三参数 offer 方法(offer(value,times,timeunit)),容量不足的时候,阻塞 times 时长(单位为 timeunit),如果在阻塞时长内,有容量空闲,新增数据返回 true。如果阻塞时长范围内,无容量空闲,放弃新增数据,返回 false。

DelayQueue

延时队列。根据比较机制,实现自定义处理顺序的队列。 常用于定时任务。 如:定时关机。

LinkedTransferQueue

转移队列, 使用 transfer 方法,实现数据的即时处理。没有消费者,就阻塞。

add 队列会保存数据,不做阻塞等待 transfer 是TransferQueue特有的方法,必须有消费者(take()方法的调用者)。如果没有任意线程消费数据,transfer方法阻塞。一般用于处理即时消息。

SynchronusQueue

同步队列,是一个容量为 0 的队列。 是一个特殊的 TransferQueue。 必须现有消费线程等待,才能使用的队列。

add 方法,无阻塞。若没有消费线程阻塞等待数据,则抛出异常。put 方法,有阻塞。若没有消费线程阻塞等待数据,则阻塞。

线程池

相关类/接口

Executor 一种处理机制线程池顶级接口。 定义方法, void execute(Runnable)。常用方法 - void execute(Runnable)作用是: 启动线程任务的。 ExecutorService 线程池服务类型,所有的线程池类型都实现这个接口。实现这个接口代表可以提供线程池能力。void execute(Runnable)Future submit(Calleable)Future submit(Runnable)void shutdown(); 优雅关闭,不是强行关闭线程池,回收线程池中的资源。而是不再处理新的任务,将已接收的任务处理完毕后再关闭。 boolean isTerminated(); 是否已经结束,相当于回收了资源 boolean isShudown(); 是否已经关闭,是否调用过shutdown方法 Executors Executor的工具类。类似Collections。可以更简单的创建若干种线程池。 Future 未来结果,代表线程任务执行结束后的结果。T get() 阻塞等待线程执行结束并得到结果T get(long,TimeUnit) 阻塞固定时长,等待线程执行结束后的结果 Callable 可执行接口,类似Runnable接口,可以启动一个线程的接口Object call() 相当于Runnable接口中的run方法,区别为此方法有返回值

线程池状态

Running 线程池正在执行中,活动状态 ShuttingDown 线程池正在关闭过程中。优雅关闭。一旦进入这个状态,线程池不再接受新的任务,处理所有已经接受的任务,处理完毕后,关闭线程池。 Terminated 线程池已经关闭。

FixedThreadPool

容量固定的线程池。活动状态和线程池容量是有上限的线程池。 所有的线程池中,都有一个任务队列。使用的是** BlockingQueue作为任务的载体。 当任务数量大于线程池容量的时候,没有运行的任务保存在任务队列中,当线程有空闲的,自动从队列中取出任务执行。 使用场景: 大多数情况下,使用的线程池,首选推荐 FixedThreadPool。 OS 系统和硬件是有线程支持上限。不能随意的无限制提供线程池。 线程池默认的容量上限**是 Integer.MAX_VALUE。 常见的线程池容量: PC - 200。 服务器 - 1000~10000 queued tasks - 任务队列 completed tasks - 结束任务队列

ExecutorService service = Executors.newFixedThreadPool(5); for(int i = 0; i < 6; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }

CashedThreadPool

缓存的线程池。容量不限(Integer.MAX_VALUE)。自动扩容。 容量管理策略:如果线程池中的线程数量不满足任务执行,创建新的线程。每次有新任务无法即时处理的时候,都会创建新的线程。 当线程池中的线程空闲时长达到一定的临界值(默认 60 秒),自动释放线程。 默认线程空闲** 60 秒**,自动销毁。 应用场景: 内部应用或测试应用。 内部应用,有条件的内部数据瞬间处理时应用,如: 电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的 信心)。 测试应用 ,在测试的时候 ,尝试得到硬件或软件的最高负载量 ,用于提供 FixedThreadPool 容量的指导。

ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for(int i = 0; i < 5; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }

ScheduledThreadPool

计划任务线程池。可以根据计划自动执行任务的线程池。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) **runnable **- 要执行的任务。 **start_limit **- 第一次任务执行的间隔。 **limit **- 多次任务执行的间隔。 **timeunit **- 多次任务执行间隔的时间单位。 使用场景: 计划任务时选用(DelaydQueue),如:电信行业中的数据整理,没分钟整理,没消失整理,每天整理等。

ScheduledExecutorService service = Executors.newScheduledThreadPool(3); System.out.println(service); // 定时完成任务。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) // runnable - 要执行的任务。 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS);

SingleThreadExecutor

单一容量的线程池。 使用场景: 保证任务顺序时使用。 如: 游戏大厅中的公共频道聊天。 秒杀。

ExecutorService service = Executors.newSingleThreadExecutor(); System.out.println(service); for(int i = 0; i < 5; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }

ForkJoinPool

分支合并线程池(mapduce 类似的设计思想)。适合用于处理复杂任务。 初始化线程容量与 CPU 核心数相关。 线程池中运行的内容必须是 ForkJoinTask 的子类型(RecursiveTask,RecursiveAction)。**ForkJoinPool - 分支合并线程池。 可以递归完成复杂任务。 要求可分支合并的任务必须是ForkJoinTask **类型的子类型。 其中提供了分支和合并的能力。 ForkJoinTask 类型提供了两个抽象子类型, **RecursiveTask **有返回结果的分支合并任务,**RecursiveAction **无返回结果的分支合并任务。(Callable/Runnable) compute 方法:就是任务的执行逻辑。 **ForkJoinPool **没有所谓的容量。默认都是 1 个线程。根据任务自动的分支新的子线程。 当子线程任务结束后,自动合并。 所谓自动是根据 fork 和 join 两个方法实现的。 应用: 主要是做科学计算或天文计算的。 数据分析的。

public class Test_ForkJoinPool { final static int[] numbers = new int[1000000]; final static int MAX_SIZE = 50000; final static Random r = new Random(); static { for (int i = 0; i < numbers.length; i++) { numbers[i] = r.nextInt(1000); } } static class AddTask extends RecursiveTask<Long> { // RecursiveAction int begin, end; public AddTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Long compute() { if ((end - begin) < MAX_SIZE) { long sum = 0L; for (int i = begin; i < end; i++) { sum += numbers[i]; } // System.out.println("form " + begin + " to " + end + " sum is : " + sum); return sum; } else { int middle = begin + (end - begin) / 2; AddTask task1 = new AddTask(begin, middle); AddTask task2 = new AddTask(middle, end); task1.fork();// 就是用于开启新的任务的。 就是分支工作的。 就是开启一个新的线程任务。 task2.fork(); // join - 合并。将任务的结果获取。 这是一个阻塞方法。一定会得到结果数据。 return task1.join() + task2.join(); } } } public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { long result = 0L; for (int i = 0; i < numbers.length; i++) { result += numbers[i]; } System.out.println(result); ForkJoinPool pool = new ForkJoinPool(); AddTask task = new AddTask(0, numbers.length); Future<Long> future = pool.submit(task); System.out.println(future.get()); } }

*ThreadPoolExecutor

线程池底层实现。 除 ForkJoinPool 外,其他常用线程池底层都是使用 ThreadPoolExecutor实现的。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) int corePoolSize 核心容量,创建线程池的时候,默认有多少线程。 也是线程池保持的最少线程数 int maximumPoolSize 最大容量,线程池最多有多少线程 long keepAliveTime 生命周期, 0 为永久。当线程空闲多久后,自动回收。 TimeUnit unit 生命周期单位,为生命周期提供单位,如:秒,毫秒 BlockingQueue workQueue 任务队列,阻塞队列。 注意,泛型必须是

使用场景: 默认提供的线程池不满足条件时使用。如:初始线程数据 4,最大线程数 200,线程空闲周期 30 秒。 推荐手动创建线程池。

// 模拟fixedThreadPool, 核心线程5个,最大容量5个,线程的生命周期无限。 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); for(int i = 0; i < 6; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }

最新回复(0)