比如AtomicInteger/AtomicBoolean/AtomicIntegerArray/AtomicLong… 原子操作类型,其中的每个方法都是原子操作,可以保证原子操作,可以保证线程安全
AtomicInteger count = new AtomicInteger(0);可以和锁混合使用或替代锁的功能,在门闩未完全开放前等待,当门闩完全开方后执行,避免锁的效率底下问题。
CountDownLatch latch = new CountDownLatch(5); // 创建5个门闩latch.await(); // 等待门闩开方latch.countDown(); // 减门闩上的锁将门 闩减完后执行
CountDownLatch latch = new CountDownLatch(5); void m1(){ try { latch.await();// 等待门闩开放。 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m1() method"); } void m2(){ for(int i = 0; i < 10; i++){ if(latch.getCount() != 0){ System.out.println("latch count : " + latch.getCount()); latch.countDown(); // 减门闩上的锁。 } try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("m2() method : " + i); } } public static void main(String[] args) { final Test_15 t = new Test_15(); new Thread(new Runnable() { @Override public void run() { t.m1(); } }).start(); new Thread(new Runnable() { @Override public void run() { t.m2(); } }).start()重入锁,建议应用的同步方式,相对效率比synchronized高,量级较轻。
Lock lock = new ReentranLock(); 创建 lock.lock(); 加锁 lock.unlock(); **解锁 放在finally中,**手动释放所标记 lock.tryLock(); 尝试锁,如果有锁,无法获取锁标记,返回false;如果获取锁标记,返回true。尝试锁在解除锁标记( lock.unlock())的时候,一定要判断是否获取到锁标记。如果当前线程没有获取到锁标记,会抛出异常。 lock.lockInterruptibly(); 尝试打断,类似于睡眠唤醒,阻塞等待锁,可以被其他的线程打断阻塞状态。 Condition condition= lock.newCondition(); 为Lock增加条件,当条件满足时,做什么事情,如加锁或解锁。condition.await(); 释放当前线程占用的锁,并阻塞当前线程,等待唤醒 condition.signalAll(); 唤醒阻塞状态
普通阻塞 sleep(100000),可以被打断。调用thread.interrupt(),可以打断阻塞状态,抛出异常。 等待队列 wait() 被调用,也是一种阻塞状态,只能有notify唤醒。无法打断。 锁池队列 无法获取锁标记。不是所有的锁池队列都可被打断。 使用ReenrantLock的lock方法,获取锁标记的守护,如果需要阻塞等待锁标记,无法被打断。使用ReenrantLock的lockInterruptibly方法,获取锁标记的时候,如果需要阻塞等待,可以被打断。代替synchronized
Lock lock = new ReentrantLock(); void m1(){ try{ lock.lock(); // 加锁 for(int i = 0; i < 10; i++){ TimeUnit.SECONDS.sleep(1); System.out.println("m1() method " + i); } }catch(InterruptedException e){ e.printStackTrace(); }finally{ lock.unlock(); // 解锁 } } void m2(){ lock.lock(); System.out.println("m2() method"); lock.unlock(); } public static void main(String[] args) { final Test_01 t = new Test_01(); new Thread(new Runnable() { @Override public void run() { t.m1(); } }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(new Runnable() { @Override public void run() { t.m2(); } }).start(); }公平锁*private static *ReentrantLock *lock *= *new *ReentrantLock(true);
在多线程环境中,为每一个线程保存一份线程变量的工具 https://www.cnblogs.com/dolphin0520/p/3920407.html
public T get()public void set(T value)public void remove()protected T initialValue()使用ThreadLocal的时候,一定注意回收资源问题,每个线程结束之前,将当前线程保存的线程变量一定要删除。ThreadLoacl.remove() 避免内存泄漏
题目 自定义容器,提供新增元素(add)和获取元素数量(size)方法。 启动两个线程。线程1向容器中新增10个数据。线程2监听容器元素数量,当容器元素数量为5时,线程2输出信息并终止。
使用volatile,保证t的可见性
public static void main(String[] args) { final volatile List<Object> t = new ArrayList<>(); final Object lock = new Object(); new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 10; i++){ System.out.println("add Object to Container " + i); t.add(new Object()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable(){ @Override public void run() { while(true){ if(t.size() == 5){ System.out.println("size = 5"); break; } } } }).start(); }使用wait和notify wait是指在一个已经进入了同步锁的线程内,让自己暂时让出同步锁*,以便其他正在等待此锁的线程可以得到同步锁并运行,只有其他线程调用了notify方法(notify并不释放锁,只是告诉调用过wait方法的线程可以去参与获得锁的竞争了),调用wait方法的一个或多个线程就会解除wait状态,重新参与竞争对象锁,程序如果可以再次得到锁,就可以继续向下运行。
public static void main(String[] args) { final volatile List<Object> t = new ArrayList<>(); final Object lock = new Object(); new Thread(new Runnable(){ @Override public void run() { synchronized (lock) { if(t.size() != 5){ try { lock.wait(); // 线程进入等待队列。 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("size = 5"); lock.notifyAll(); // 唤醒其他等待线程 } } }).start(); new Thread(new Runnable() { @Override public void run() { synchronized (lock) { for(int i = 0; i < 10; i++){ System.out.println("add Object to Container " + i); t.add(new Object()); if(t.size() == 5){ lock.notifyAll(); try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); }*使用CountDownLatch 门闩
public static void main(String[] args) { final Container t = new Container(); final CountDownLatch latch = new CountDownLatch(1); new Thread(new Runnable(){ @Override public void run() { if(t.size() != 5){ try { latch.await(); // 等待门闩的开放。 不是进入等待队列 } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("size = 5"); } }).start(); new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 10; i++){ System.out.println("add Object to Container " + i); t.add(new Object()); if(t.size() == 5){ latch.countDown(); // 门闩-1 } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); }题目 自定义同步容器,容器容量上限为10。可以在多线程中应用,并保证数据线程安全。
wait、notify 都是和while 配合应用的。可以避免多线程并发判断逻辑失效的问题。 使用synchronized
public class TestContainer01<E> { private final LinkedList<E> list = new LinkedList<>(); private final int MAX = 10; private int count = 0; public synchronized int getCount(){ return count; } public synchronized void put(E e){ while(list.size() == MAX){ try { this.wait(); } catch (InterruptedException e1) { e1.printStackTrace(); } } list.add(e); count++; this.notifyAll(); } public synchronized E get(){ E e = null; while(list.size() == 0){ try{ this.wait(); } catch (InterruptedException e1) { e1.printStackTrace(); } } e = list.removeFirst(); count--; this.notifyAll(); return e; } public static void main(String[] args) { 。。。 }}
*使用ReentrantLock
public class TestContainer02<E> { private final LinkedList<E> list = new LinkedList<>(); private final int MAX = 10; private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public int getCount(){ return count; } public void put(E e){ lock.lock(); try { while(list.size() == MAX){ System.out.println(Thread.currentThread().getName() + " 等待。。。"); // 进入等待队列。释放锁标记。 // 借助条件,进入的等待队列。 producer.await(); } System.out.println(Thread.currentThread().getName() + " put 。。。"); list.add(e); count++; // 借助条件,唤醒所有的消费者。 consumer.signalAll(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { lock.unlock(); } } public E get(){ E e = null; lock.lock(); try { while(list.size() == 0){ System.out.println(Thread.currentThread().getName() + " 等待。。。"); // 借助条件,消费者进入等待队列 consumer.await(); } System.out.println(Thread.currentThread().getName() + " get 。。。"); e = list.removeFirst(); count--; // 借助条件,唤醒所有的生产者 producer.signalAll(); } catch (InterruptedException e1) { e1.printStackTrace(); } finally { lock.unlock(); } return e; } public static void main(String[] args) { 。。。 } }解决并发情况下的容器线程安全问题的。给多线程准备一个线程安全的容器对象。 线程安全的容器对象,都是使用synchronized方法实现,类似native,java8使用CAS
VecrorHashtable等concurrent包中的同步容器,大多是使用系统底层技术实现的线程安全。
底层哈希实现的同步 Map(Set)。效率高,线程安全。使用系统底层技术实现线程安全。量级较 synchronized 低。 key 和 value 不能为 null。
底层跳表(SkipList) 实现的同步 Map(Set)。有序,效率比 ConcurrentHashMap 稍低。
写时复制集合。写入效率低,读取效率高。每次写入数据,都会创建一个新的底层数组。 牺牲空间实现线程安全。 适用于写少读多的情况。
基础链表同步队列。
阻塞队列,队列容量不足自动阻塞,队列容量为 0 自动阻塞。
put 自动阻塞,队列容量满后,自动阻塞take 自动阻塞方法,队列容量为0后,自动阻塞底层数组实现的有界队列。 自动阻塞。根据调用 API(add/put/offer) 不同,有不同特性。 当容量不足的时候,有阻塞能力。
add 方法在容量不足的时候,抛出异常。put 方法在容量不足的时候,阻塞等待。offer 方法, 单参数 offer 方法,不阻塞。容量不足的时候,返回 false。当前新增数据操作放弃。三参数 offer 方法(offer(value,times,timeunit)),容量不足的时候,阻塞 times 时长(单位为 timeunit),如果在阻塞时长内,有容量空闲,新增数据返回 true。如果阻塞时长范围内,无容量空闲,放弃新增数据,返回 false。延时队列。根据比较机制,实现自定义处理顺序的队列。 常用于定时任务。 如:定时关机。
转移队列, 使用 transfer 方法,实现数据的即时处理。没有消费者,就阻塞。
add 队列会保存数据,不做阻塞等待 transfer 是TransferQueue特有的方法,必须有消费者(take()方法的调用者)。如果没有任意线程消费数据,transfer方法阻塞。一般用于处理即时消息。同步队列,是一个容量为 0 的队列。 是一个特殊的 TransferQueue。 必须现有消费线程等待,才能使用的队列。
add 方法,无阻塞。若没有消费线程阻塞等待数据,则抛出异常。put 方法,有阻塞。若没有消费线程阻塞等待数据,则阻塞。容量固定的线程池。活动状态和线程池容量是有上限的线程池。 所有的线程池中,都有一个任务队列。使用的是** BlockingQueue作为任务的载体。 当任务数量大于线程池容量的时候,没有运行的任务保存在任务队列中,当线程有空闲的,自动从队列中取出任务执行。 使用场景: 大多数情况下,使用的线程池,首选推荐 FixedThreadPool。 OS 系统和硬件是有线程支持上限。不能随意的无限制提供线程池。 线程池默认的容量上限**是 Integer.MAX_VALUE。 常见的线程池容量: PC - 200。 服务器 - 1000~10000 queued tasks - 任务队列 completed tasks - 结束任务队列
ExecutorService service = Executors.newFixedThreadPool(5); for(int i = 0; i < 6; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }缓存的线程池。容量不限(Integer.MAX_VALUE)。自动扩容。 容量管理策略:如果线程池中的线程数量不满足任务执行,创建新的线程。每次有新任务无法即时处理的时候,都会创建新的线程。 当线程池中的线程空闲时长达到一定的临界值(默认 60 秒),自动释放线程。 默认线程空闲** 60 秒**,自动销毁。 应用场景: 内部应用或测试应用。 内部应用,有条件的内部数据瞬间处理时应用,如: 电信平台夜间执行数据整理(有把握在短时间内处理完所有工作,且对硬件和软件有足够的 信心)。 测试应用 ,在测试的时候 ,尝试得到硬件或软件的最高负载量 ,用于提供 FixedThreadPool 容量的指导。
ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for(int i = 0; i < 5; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }计划任务线程池。可以根据计划自动执行任务的线程池。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) **runnable **- 要执行的任务。 **start_limit **- 第一次任务执行的间隔。 **limit **- 多次任务执行的间隔。 **timeunit **- 多次任务执行间隔的时间单位。 使用场景: 计划任务时选用(DelaydQueue),如:电信行业中的数据整理,没分钟整理,没消失整理,每天整理等。
ScheduledExecutorService service = Executors.newScheduledThreadPool(3); System.out.println(service); // 定时完成任务。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit) // runnable - 要执行的任务。 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS);单一容量的线程池。 使用场景: 保证任务顺序时使用。 如: 游戏大厅中的公共频道聊天。 秒杀。
ExecutorService service = Executors.newSingleThreadExecutor(); System.out.println(service); for(int i = 0; i < 5; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }分支合并线程池(mapduce 类似的设计思想)。适合用于处理复杂任务。 初始化线程容量与 CPU 核心数相关。 线程池中运行的内容必须是 ForkJoinTask 的子类型(RecursiveTask,RecursiveAction)。**ForkJoinPool - 分支合并线程池。 可以递归完成复杂任务。 要求可分支合并的任务必须是ForkJoinTask **类型的子类型。 其中提供了分支和合并的能力。 ForkJoinTask 类型提供了两个抽象子类型, **RecursiveTask **有返回结果的分支合并任务,**RecursiveAction **无返回结果的分支合并任务。(Callable/Runnable) compute 方法:就是任务的执行逻辑。 **ForkJoinPool **没有所谓的容量。默认都是 1 个线程。根据任务自动的分支新的子线程。 当子线程任务结束后,自动合并。 所谓自动是根据 fork 和 join 两个方法实现的。 应用: 主要是做科学计算或天文计算的。 数据分析的。
public class Test_ForkJoinPool { final static int[] numbers = new int[1000000]; final static int MAX_SIZE = 50000; final static Random r = new Random(); static { for (int i = 0; i < numbers.length; i++) { numbers[i] = r.nextInt(1000); } } static class AddTask extends RecursiveTask<Long> { // RecursiveAction int begin, end; public AddTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Long compute() { if ((end - begin) < MAX_SIZE) { long sum = 0L; for (int i = begin; i < end; i++) { sum += numbers[i]; } // System.out.println("form " + begin + " to " + end + " sum is : " + sum); return sum; } else { int middle = begin + (end - begin) / 2; AddTask task1 = new AddTask(begin, middle); AddTask task2 = new AddTask(middle, end); task1.fork();// 就是用于开启新的任务的。 就是分支工作的。 就是开启一个新的线程任务。 task2.fork(); // join - 合并。将任务的结果获取。 这是一个阻塞方法。一定会得到结果数据。 return task1.join() + task2.join(); } } } public static void main(String[] args) throws InterruptedException, ExecutionException, IOException { long result = 0L; for (int i = 0; i < numbers.length; i++) { result += numbers[i]; } System.out.println(result); ForkJoinPool pool = new ForkJoinPool(); AddTask task = new AddTask(0, numbers.length); Future<Long> future = pool.submit(task); System.out.println(future.get()); } }线程池底层实现。 除 ForkJoinPool 外,其他常用线程池底层都是使用 ThreadPoolExecutor实现的。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) int corePoolSize 核心容量,创建线程池的时候,默认有多少线程。 也是线程池保持的最少线程数 int maximumPoolSize 最大容量,线程池最多有多少线程 long keepAliveTime 生命周期, 0 为永久。当线程空闲多久后,自动回收。 TimeUnit unit 生命周期单位,为生命周期提供单位,如:秒,毫秒 BlockingQueue workQueue 任务队列,阻塞队列。 注意,泛型必须是使用场景: 默认提供的线程池不满足条件时使用。如:初始线程数据 4,最大线程数 200,线程空闲周期 30 秒。 推荐手动创建线程池。
// 模拟fixedThreadPool, 核心线程5个,最大容量5个,线程的生命周期无限。 ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); for(int i = 0; i < 6; i++){ service.execute(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " - test executor"); } }); }