核心技术:并发
什么是线程
一个程序同时执行多个任务,每一个任务称为一个线程(thread), 它是线程控制的简称。
- 进程与线程有的区别,在于每个进程拥有自己的一整套变量, 而线程则共享数据。
- 共享变量使线程之间的通信比进程之间的通信更有效、更容易。
使用线程
实现线程的两种方式:
- 实现Runnable接口,利用接口构造Thread对象;(Runnable 是一个函数式接口,可以用lambda 表达式)
Runnable r = () -> { /*task code*/ }; Thread t = new Thread(r);
- 继承Thread类,构造一个子类的对象;
class MyThread extends Thread { public void run() { //task code } }
- 不要调用Thread类或Runnable对象的run方法,而应该调用Thread.start方法。
- 直接调用run 方法,只会执行同一个线程中的任务,而不会启动新线程。
- 如果有很多任务, 要为每个任务创建一个独立的线程所付出的代价太大了。可以使用线程池来解决这个问题
相关方法:
java.Iang.Thread | |
---|---|
Thread( Runnable target ) | 构造一个新线程, 用于调用给定目标的nm() 方法。 |
void start( ) | 启动这个线程, 将引发调用mn() 方法。这个方法将立即返回, 并且新线程将并发运行。 |
void run( ) | 调用关联 Runnable 的run 方法。 |
static void sleep(long minis) | 休眠给定的毫秒数。参数:millis 休眠的毫秒数 |
java.lang.Runnable | |
void run( ) | 必须覆盖这个方法, 并在这个方法中提供所要执行的任务指令。 |
中断线程
当线程的run 方法执行方法体中最后一条语句后,并经由执行return 语句返冋时,或者出现了在方法中没有捕获的异常时,线程将终止。
- 在Java 的早期版本中,还有一个stop方法,其他线程可以调用它终止线程。
- 因为,stop会瞬间强行停止一个线程,
且该线程持有的锁并不能释放;所以,这个方法现在已经被弃用了。
- 因为,stop会瞬间强行停止一个线程,
相关方法:
java.Iang.Thread | |
---|---|
void interrupt() | 向线程发送中断请求。线程的中断状态将被设置为true。如果目前该线程被一个sleep调用阻塞,那么, InterruptedException 异常被抛出。 |
static boolean interrupted() | 测试当前线程(即正在执行这一命令的线程)是否被中断。注意,这是一个静态方法。这一调用会产生副作用—它将当前线程的中断状态重置为false。 |
boolean islnterrupted() | 测试线程是否被终止。不像静态的中断方法,这一调用不改变线程的中断状态。 |
static Thread currentThread() | 返回代表当前执行线程的Thread 对象。 |
线程状态
线程可以有如下6 种状态:
- New ( 新创建)
- Runnable (可运行)
- Blocked ( 被阻塞)
- Waiting ( 等待)
- Timed waiting (计时等待)
- Terminated ( 被终止)
- 可调用“getState”方法获取一个线程的当前状态;
新创建线程
用 new 操作符创建一个新线程,未调用 start 方法,该线程还没有开始运行,即 New 状态
可运行线程
一旦调用 start 方法,则线程处于 runnable 状态。
- (Java 的规范说明没有将它作为一个单独状态:正在运行中的线程仍然处于可运行状态。)
结合线程状态图,理解为:“可运行” = “运行” + “就绪”;
被阻塞线程和等待线程
当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它。
- (看到里有点混,于是Java的阻塞、等待及相关方法)
阻塞(blocked)
阻塞:当一个线程请求锁,而该资源被其他线程持有时,该线程被置为阻塞状态。
- 请求内部的对象锁(非java.util.concurrent库中的锁)。
等待(waiting)
等待:当线程等待另一个线程通知调度器一个条件时,该线程主动进入等待状态。
如:
- Object.wait
- Thread.join
- java.util.concurrent 库中的Lock 或Condition
计时等待(timed waiting)
计时等待状态将一直保持到超时期满或者接收到适当的通知。
如:
- Object.wait(long millis)
- Thread.join(long millis)
- java.util.concurrent 库中的Lock(long millis) 或Condition(long millis)
被终止的线程(Terminated)
线程因如下两个原因之一而被终止:
- 因为run 方法正常退出而自然死亡。
- 因为一个没有捕获的异常终止了nm 方法而意外死亡。
- 不应该使用“stop”方法来终止线程!(虽然能做到)
相关方法:
java.iang.Thread | |
---|---|
void join() | 等待指定线程的终止。【在当前线程调用“s.join();”,则必须等待线程s执行完毕,当前线程才能继续执行】 |
void join(long millis) | 等待指定的线程死亡或者经过指定的毫秒数。 |
Thread.State getState() | 得到这一线程的状态;NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING 或 TERMINATED 之一。 |
void stop() | 停止该线程。这一方法已过时。 |
void suspend() | 暂停这一线程的执行。这一方法已过时。 |
void resume() | 恢复线程。这一方法仅仅在调用suspend() 之后调用。这一方法已过时。 |
线程属性
线程的各种属性,如:线程优先级、守护线程、线程组以及处理未捕获异常的处理器。
private int priority;
...
private boolean daemon = false;
...
private ThreadGroup group;
public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;
...
// null unless explicitly set
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;
// null unless explicitly set
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;
线程优先级(Priority)
- 默认情况下,线程会继承它的父线程的优先级;(可以在 init 方法中发现,同时还会继承 “group”、“daemon”)
- 可以用setPriority 方法提高或降低任何一个线程的优先级。
- 不要将程序构建为功能的正确性依赖于优先级。
如果确实要使用优先级, 应该避免初学者常犯的一个错误。如果有几个高优先级的线程没有进入非活动状态, 低优先级的线程可能永远也不能执行。每当调度器决定运行一个新线程时, 首先会在具有高优先级的线程中进行选择, 尽管这样会使低优先级的线程完全饿死。
【???不用动态优先级吗,还是和底层实现有关?】
相关方法:
java.lang.Thread | |
---|---|
void setPriority(int newPriority) | 设置线程的优先级。优先级必须在Thread.MIN_PRIORITY 与Thread.MAX_PRIORITY之间。一般使用Thread.NORMJ»RIORITY 优先级。 |
static int MIN_PRIORITY | 线程的最小优先级。最小优先级的值为1。 |
static int N0RM_PRI0RITY | 线程的默认优先级。默认优先级为5。 |
static int MAX—PRIORITY | 线程的最高优先级。最高优先级的值为10。 |
static void yield( ) | 导致当前执行线程处于让步状态。如果有其他的可运行线程具有至少与此线程同样高的优先级,那么这些线程接下来会被调度。注意,这是一个静态方法。【让出CPU】 |
守护线程(Daemon)
- 守护线程的唯一用途是为其他线程提供服务,当只剩下守护线程时,虚拟机就退出了。
- 可以通过调用“t .setDaemon(true);”将线程转换为守护线程(daemon thread)。
- 守护线程应该永远不去访问固有资源, 如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。
相关方法:
java.lang.Thread | |
---|---|
void setDaemon( boolean isDaemon ) | 标识该线程为守护线程或用户线程。这一方法必须在线程启动之前调用。 |
线程组(ThreadGroup)
线程组是一个可以统一管理的线程集合。默认情况下,创建的所有线程属于相同的线程组,但是,也可能会建立其他的组。现在引入了更好的特性用于线程集合的操作,所以建议不要在自己的程序中使用线程组。
相关方法:
java.lang.ThreadGroup | |
---|---|
void UncaughtException( Thread t, Throwable e) | 如果有父线程组,调用父线程组的这一方法; 或者,如果Thread 类有默认处理器,调用该处理器,否则,输出栈轨迹到标准错误流上(但是,如果 e 是一个ThreadDeath对象,栈轨迹是被禁用的。ThreadDeath 对象由 stop 方法产生,而该方法已经过时)。 |
未捕获异常处理器(uncaughtExceptionHandler)
- 线程的run 方法不能抛出任何受查异常。非受査异常会导致线程终止,在线程死亡之前,异常被传递到一个用于未捕获异常的处理器。
- 该处理器必须属于一个实现Thread.UncaughtExceptionHandler接口的类,这个接口只有—个方法“void uncaughtException(Thread t, Throwable e);”
相关方法:
java.lang.Thread | |
---|---|
static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler ) | 设置或获取未捕获异常的默认处理器。 |
static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() | |
void setUncaughtExceptionHandler( Thread.UncaughtExceptionHandlerhandler ) | 设置或获取未捕获异常的处理器。如果没有安装处理器, 则将线程组对象作为处理器。 |
Thread.UncaughtExceptionHandler getUncaughtExceptionHandler( ) | |
java.Iang.Thread.UncaughtExceptionHandler | |
void UncaughtException(Thread t , Throwable e) | 当一个线程因未捕获异常而终止, 按规定要将客户报告记录到日志中。
|
同步
多个线程共享同一数据时,线程发出一个方法调用,在没有得到结果之前,这个调用就不返回,同时其它的线程也不能调用这个方法。
锁对象
有两种机制防止代码块受并发访问的干扰:
- Java 语言提供一个synchronized 关键字
- 并且Java SE 5.0 引入了ReentrantLock 类。
- 把解锁操作括在finally 子句之内是至关重要的。如果在临界区的代码抛出异常,锁必须被释放。否则,其他线程将永远阻塞。
- 如果使用锁, 就不能使用带资源的try 语句。
- 锁是可重入的, 因为线程可以重复地获得已经持有的锁。锁保持一个持有计数( holdcount ) 来跟踪对lock 方法的嵌套调用。线程在每一次调用lock 都要调用unlock 来释放锁。由于这一特性, 被一个锁保护的代码可以调用另一个使用相同的锁的方法。
示例:
public class Bank {
private Lock bankLock = new ReentrantLock0;// ReentrantLock implements the Lock interface
...
public void transfer(int from, int to, int amount) {
bankLock.lock();
try {
System.out.print(Thread.currentThread0);
accounts[from] -= amount;
System.out.printf(" X10.2f from %A to Xd", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: X10.2fXn", getTotalBalanceO);
}
finally {
banklock.unlockO;
}
}
}
- transfer 方法调用getTotalBalance 方法, 这也会封锁bankLock 对象,此时bankLock对象的持有计数为2。
- 当getTotalBalance 方法退出的时候, 持有计数变回1。
- 当transfer 方法退出的时候, 持有计数变为0。线程释放锁。
相关方法:
java.util.concurrent.locks.Lock | |
---|---|
void lock( ) | 获取这个锁;如果锁同时被另一个线程拥有则发生阻塞。 |
void unlock( ) | 释放这个锁。 |
java,util.concurrent.locks.ReentrantLock | |
ReentrantLock() | 构建一个可以被用来保护临界区的可重入锁。 |
ReentrantLock(boolean fair) | 构建一个带有公平策略的锁。
|
条件对象(Condition)
条件对象用于管理那些已经获得了一个锁但是却不能做有用工作的线程。
使用:
- 一个锁对象可以有一个或多个相关的条件对象,用 newCondition 方法获得一个条件对象。
class Bank { private Condition sufficientFunds; public Bank() { sufficientFunds = bankLock.newCondition(); } }
- 当条件不满足时,调用await:
sufficientFunds.await();
- 当前线程现在被阻塞了,并放弃了锁。
- 一旦一个线程调用await方法, 它进人该条件的等待集。当锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程调用同一条件上的 signalAll 方法时为止。
- 重新激活因为这一条件而等待的所有线程signalAll:
sufficientFunds.signalAll();
- 至关重要的是最终需要某个其他线程调用 signalAll 方法
- 调用 signalAll 不会立即激活一个等待线程。它仅仅解除等待线程的阻塞,以便这些线程可以在当前线程退出同步方法之后,通过竞争实现对对象的访问。
示例:
public void transfer(int from, int to, int amount)
{
bankLock.lock();
try
{
while (accounts[from] < amount)
sufficientFunds.await();
// transfer funds
...
sufficientFunds.signalAll();
}
finally
{
bankLock.unlock();
}
}
synchronized 关键字
有关锁和条件的关键之处:
- 锁用来保护代码片段,任何时刻只能有一个线程执行被保护的代码。
- 锁可以管理试图进入被保护代码段的线程。
- 锁可以拥有一个或多个相关的条件对象。
- 每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。
synchronized 与 wait、notifyAll
从1.0 版开始,Java中的每一个对象都有一个内部锁。如果一个方法用 synchronized 关键字声明,那么对象的锁将保护整个方法。也就是说,要调用该方法, 线程必须获得内部的对象锁。 内部对象锁只有一个相关条件。wait 方法添加一个线程到等待集中,notifyAll /notify 方法解除等待线程的阻塞状态。
- 每一个对象有一个内部锁, 并且该锁有一个内部条件。
即:
public synchronized void methodO { method body }
- 等价于
public void methodQ { this.intrinsidock.1ock(); try { method body } finally { this.intrinsicLock.unlockO; } }
- wait 、notityAll等价于
intrinsicCondition.await(); intrinsicCondition.signalAll();
示例:
class Bank
{
private double[] accounts;
public synchronized void transfer(int from,int to, int amount) throws InterruptedException
{
while (accounts[from] < amount)
wait(); // wait on intrinsic object lock's single condition
accounts[from] -= amount ;
accounts[to] += amount ;
notifyAll(); // notify all threads waiting on the condition
}
public synchronized double getTotalBalanceO { . . . }
}
Synchronized 使用
- 将静态方法声明为 synchronized 时,该方法获得相关的类对象的内部锁,没有其他线程可以调用同一个类的这个或任何其他的同步静态方法。
- 例如, 如果Bank 类有一个静态同步的方法,那么当该方法被调用时,Bank.class对象的锁被锁住。
内部锁和条件存在一些局限。包括:
- 不能中断一个正在试图获得锁的线程。
- 试图获得锁时不能设定超时。
- 每个锁仅有单一的条件, 可能是不够的。
- 最好使用JUC框架(java.util.concurrent)提供的锁,而不是“Lock/Condition”和“synchronized”。
相关方法:
java.lang.Object | |
---|---|
void notifyAll() | 解除那些在该对象上调用 wait 方法的线程的阻塞状态。
|
void notify() | 随机选择一个在该对象上调用 wait 方法的线程,解除其阻塞状态。
|
void wait() | 导致线程进人等待状态直到它被通知。
|
void wait(long millis) | 导致线程进入等待状态直到它被通知或者经过指定的时间。
|
void wait(long millis, int nanos) |
同步阻塞
除了调用同步方法,还有另一种机制可以获得锁:通过进入一个同步阻塞。
如:
public class Bank
{
private doublet[] accounts;
private Object lock = new Object();
...
public void transfer(int from, int to, int amount)
{
synchronized (lock) // an ad-hoc lock
{
accounts[from] -= amount;
accounts[to] += amount;
}
System.out.print1n( ... );
}
}
- lock 对象被创建仅仅是用来使用每个Java 对象持有的锁。
监视器概念(monitor)
监视器可以在不需要程序员考虑如何加锁的情况下,就可以保证多线程的安全性。
监视器具有如下特性:
- 监视器是只包含私有域的类。
- 每个监视器类的对象有一个相关的锁。
- 使用该锁对所有的方法进行加锁。换句话说,如果客户端调用obj.methood() , 那么obj对象的锁是在方法调用开始时自动获得,并且当方法返回时自动释放该锁。因为所有的域是私有的,这样的安排可以确保一个线程在对对象操作时,没有其他线程能访问该域。
- 该锁可以有任意多个相关条件。
Java中以不是很精确的方式采用了监视器概念:
- 用 synchronized 关键字声明方法
- 通过调用 wait/notifyAll/notify 来访问条件变量
Java 对象不同于监视器, 从而使得线程的安全性下降:
- 域不要求必须是 private。
- 方法不要求必须是 synchronized。
- 内部锁对客户是可用的。【?】
Volatile 域
volatile 关键字为实例域的同步访问提供了一种免锁机制:
- 如果声明一个域为volatile,那么编译器和虚拟机就知道该域是可能被另一个线程并发更新的。
- volatile 不保证原子性
原子性
java.util.concurrent.atomic 包中有很多类使用了很高效的机器级指令(而不是使用锁) 来保证其他操作的原子性(操作不会中断)。
死锁(deadlock)
- 互斥访问:一个资源每次只能被一个线程使用。
- 请求与保持:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
- 不可抢占:线程已获得的资源,在未使用完之前,不能强行剥夺。
- 循环等待:若干线程之间形成一种头尾相接的循环等待资源关系。
相关:
- 银行家算法
线程局部变量(ThreadLocal)
使用ThreadLocal辅助类为各个线程提供各自的实例。
如:
public static final ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
String dateStamp = dateFormat.get().format(new Date());
- 在一个给定线程中首次调用get时,会调用 initialValue 方法;此后,get 方法会返回属于当前线程的那个实例。
相关方法:
java.lang.ThreadLocal<T> | |
---|---|
T get() | 得到这个线程的当前值。如果是首次调用get, 会调用initialize 来得到这个值。 |
protected initialize() | 应覆盖这个方法来提供一个初始值。默认情况下,这个方法返回mill。 |
void set(T t) | 为这个线程设置一个新值。 |
void remove() | 删除对应这个线程的值。 |
static <S> ThreadLocal <S> withlnitial ( Supplier< ? extends S> supplier ) | 创建一个线程局部变量, 其初始值通过调用给定的supplier 生成。 |
java.util.concurrent.ThreadLocalRandom | |
static ThreadLocalRandom current() | 返回特定于当前线程的Random 类实例。 |
锁测试域超时
应该更加谨慎地申请锁,以避免调用lock方法可能发生的阻塞。
- lock:一直申请锁,操作无法取消,不能被中断;
- tryLock:成功获得锁后返回true, 否则, 立即返回false, 且线程可以立即离开去做其他事情;【响应中断】
- lockInterruptibly:(相当于一个无线超时的tryLock)【响应中断】
相关方法:
java.util.concurrent.locks.Lock | |
---|---|
boolean tryLock() | 尝试获得锁而没有发生阻塞;如果成功返回真。
|
boolean tryLock(long time, TimeUnit unit) | 尝试获得锁,阻塞时间不会超过给定的值;如果成功返回true。 |
void lockInterruptibly() | 获得锁,但是会不确定地发生阻塞。
|
java.util.concurrent.locks.Condition | |
boolean await(long time , TimeUnit unit) | 进人该条件的等待集,直到线程从等待集中移出或等待了指定的时间之后才解除阻塞。如果因为等待时间到了而返回就返回false,否则返回true。 |
void awaitUninterruptibly() | 进人该条件的等待集,直到线程从等待集移出才解除阻塞。
|
读/写锁(ReenterantReadWriteLock)
“java.util.concurrent.locks” 包定义了两个锁类:“ReentrantLock”类和ReentrantReadWriteLock类。
- 二者实现了不同的类:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { public class ReentrantLock implements Lock, java.io.Serializable {
- ReentrantReadWriteLock 允许读线程共享访问,写线程互斥;
使用步骤:
- 构造一个ReentrantReadWriteLock 对象:
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock():
- 抽取读锁和写锁:
private Lock readLock = rwl.readLock(); private Lock writeLock = rwl.writeLock();
- 对所有的获取方法加读锁:
public double getTotalBalance() { readLock.lock(); try { . . . } finally { readLock.unlock(); } }
- 对所有的修改方法加写锁:
public void transfer(. . .) { writeLock.lock(); try { . . . } finally { writeLock.unlock(); } }
相关方法:
java.util.concurrent.locks.ReentrantReadWriteLock | |
---|---|
Lock readLock() | 得到一个可以被多个读操作共用的读锁,但会排斥所有写操作。 |
Lock writeLock() | 得到一个写锁,排斥所有其他的读操作和写操作。 |
为什么弃用 stop 和 suspend 方法
stop、suspend 和resume 方法已经弃用:stop 方法天生就不安全,经验证明suspend 方法(resume 用于suspend之后恢复)会经常导致死锁。
stop
该方法终止所有未结束的方法,包括run 方法。当线程被终止,立即释放被它锁住的所有对象的锁【而并非导致锁不能被释放】。这会导致对象处于不一致的状态。
- 例如,假定TransferThread在从一个账户向另一个账户转账的过程中被终止,钱款已经转出,却没有转人目标账户,现在银行对象就被破坏了。因为锁已经被释放,这种破坏会被其他尚未停止的线程观察到。
见源码注释:
* @deprecated This method is inherently unsafe. Stopping a thread with * Thread.stop causes it to unlock all of the monitors that it * has locked (as a natural consequence of the unchecked * <code>ThreadDeath</code> exception propagating up the stack). If * any of the objects previously protected by these monitors were in * an inconsistent state, the damaged objects become visible to * other threads, potentially resulting in arbitrary behavior. Many * uses of <code>stop</code> should be replaced by code that simply * modifies some variable to indicate that the target thread should * stop running. The target thread should check this variable * regularly, and return from its run method in an orderly fashion * if the variable indicates that it is to stop running. If the * target thread waits for long periods (on a condition variable, * for example), the <code>interrupt</code> method should be used to * interrupt the wait.
suspend
与stop 不同,suspend 不会破坏对象。但是,如果用suspend 挂起一个持有一个锁的线程,那么该锁在恢复之前是不可用的。
- 如果调用suspend 方法的线程试图获得同一个锁,那么程序死锁:
阻塞队列(BlockingQueue)
阻塞队列(blocking queue):当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,其会导致线程阻塞。
阻塞队列方法
- 抛出异常:“add”、“element”、“remove”;
- 阻塞:“put”、“take”;
- 返回false或null为提示:“offer”、“peek”、“poll”;(多线程中应采用)
阻塞队列类型
- LinkedBlockingQueue:双端阻塞队列,默认容量没有边界(可指定容量);
- ArrayBlockingQueue:在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性;
- 若设置了公平参数, 则那么等待了最长时间的线程会优先得到处理。(通常,公平性会降低性能, 只有在确实非常需要时才使用它。)
- PriorityBlockingQueue:是一个带优先级的队列,而不是先进先出队列。元素按照它们的优先级顺序被移出;
- 该队列是没有容量上限,但是,如果队列是空的,取元素的操作会阻塞。(有关优先级队列的详细内容参看第9 章。)
- DelayQueue:延迟阻塞队列;
- DelayQueue 包含实现Delayed 接口的对象(“Delayed”:
interface Delayed extends Comparable<Delayed>
),所以还必须实现compareTo 方法; - “getDelay”方法返回对象的残留延迟:负值表示延迟已经结束。(元素只有在延迟用完的情况下才能从DelayQueue 移除)
- DelayQueue 包含实现Delayed 接口的对象(“Delayed”:
- 其中均使用了“ReentrantLock”(域或局部变量):
private final ReentrantLock lock;
- 其中均继承了“AbstractQueue<E>”、实现了“BlockingQueue<E>”和“java.io.Serializable”:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
- JavaSE 7 增加了一个 TransferQueue 接口,允许生产者线程等待,直到消费者准备就绪可以接收一个元素。
- 如果生产者调用
q.transfer(item i);
;这个调用会阻塞, 直到另一个线程将元素(item) 删除。
- 如果生产者调用
相关方法:
java.util.concurrent.ArrayBlockingQueue<E> | |
---|---|
ArrayBlockingQueue(int capacity) | 构造一个带有指定的容量和公平性设置的阻塞队列。该队列用循环数组实现。 |
ArrayBlockingQueue(int capacity, boolean fair) | |
java.util.concurrent.LinkedBlockingQueue<E> | |
java.util.concurrent.LinkedBlockingDeque<E> | |
LinkedBlockingQueue() | 构造一个无上限的阻塞队列或双向队列,用链表实现。 |
LinkedBlockingDeque() | |
LinkedBlockingQueue(int capacity) | 根据指定容量构建一个有限的阻塞队列或双向队列,用链表实现。 |
LinkedBlockingDeque(int capacity) | |
java.util.concurrent.DelayQueue<E extends Delayed> | |
DelayQueue() | 构造一个包含Delayed 元素的无界的阻塞时间有限的阻塞队列。只有那些延迟已经超过时间的元素可以从队列中移出。 |
java.util.concurrent.Delayed | |
long getDelay(Timellnit unit) | 得到该对象的延迟,用给定的时间单位进行度量。 |
java.util.concurrent.PriorityBlockingQueue<E> | |
PriorityBlockingQueue() | 构造一个无边界阻塞优先队列,用堆实现。
|
PriorityBlockingQueue(int initialCapacity) | |
PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) | |
java.util.concurrent.BlockingQueue<E> | |
void put (E element) | 添加元素,在必要时阻塞。 |
E take() | 移除并返回头元素,必要时阻塞。 |
boolean offer (E element , long time , TimeUnit unit) | 添加给定的元素,如果成功返回true,如果必要时阻塞,直至元素已经被添加或超时。 |
E poll (long time , TimeUnit unit) | 移除并返回头元素,必要时阻塞,直至元素可用或超时用完。失败时返回null。 |
java.util.concurrent.BiockingDeque<E> | |
void putFirst(E element) | 添加元素,必要时阻塞。 |
void putLast(E element) | |
E takeFirst () | 移除并返回头元素或尾元素,必要时阻塞。 |
E takeLast () | |
boolean offerFirst (E element, long time, TimeUnit unit) | 添加给定的元素,成功时返回true,必要时阻塞直至元素被添加或超时。 |
boolean offerLast (E element, long time, TimeUnit unit) | |
E pol1First(1ong time, TimeUnit unit) | 移动并返回头元素或尾元素,必要时阻塞,直至元素可用或超时。失败时返回mill。 |
E pol1Last (1ong time, TimeUnit unit) | |
java.util.concurrent.Transfer Queue<E> | |
void transfer (E element) | 传输一个值,或者尝试在给定的超时时间内传输这个值,这个调用将阻塞,直到另一个线程将元素删除。第二个方法会在调用成功时返回true。 |
boolean tryTransfer (E element, long time, TimeUnit unit) |
线程安全的集合
如果多线程要并发地修改一个数据结构,那么很容易会破坏这个数据结构:
- 可以通过提供锁来保护共享数据结构,但是选择线程安全的实现作为替代可能更容易些。
高效的 Map、Set 和 Queue
java.util.concurrent 包提供了映射、有序集和队列的高效实现:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet和ConcurrentLinkedQueue。
- 这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分来使竞争极小化。
- 与大多数集合不同,size 方法不必在常量时间内操作。确定这样的集合当前的大小通常需要遍历。
ConcurrentHashMap
有些应用使用庞大的并发散列映射,这些映射太过庞大,以至于无法用size 方法得到它的大小,因为这个方法只能返回int。对于一个包含超过20 亿条目的映射该如何处理? JavaSE 8 引入了一个mappingCount 方法可以把大小作为long 返回。
- mappingCount:用于返回映射数量(ConcurrentHashMap包含的映射可能多于可以表示为int的映射,所以不应该使用“size()”);
- 返回的值是一个估计值;如果存在并发的插入或删除操作,则实际计数可能会有所不同。
/** * Returns the number of mappings. This method should be used * instead of {@link #size} because a ConcurrentHashMap may * contain more mappings than can be represented as an int. The * value returned is an estimate; the actual count may differ if * there are concurrent insertions or removals. * * @return the number of mappings * @since 1.8 */ public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values }
并发的散列映射表,可高效地支持大量的读者和一定数量的写者。默认情况下,假定可以有多达16 个写者线程同时执行。可以有更多的写者线程,但是,如果同一时间多于16个,其他线程将暂时被阻塞。可以指定更大数目的构造器,然而,恐怕没有这种必要。 散列映射将有相同散列码的所有条目放在同一个 桶”中。有些应用使用的散列函数不当,以至于所有条目最后都放在很少的桶中,这会严重降低性能。即使是一般意义上还算合理的散列函数,如String 类的散列函数,也可能存在问题。例如,攻击者可能会制造大量有相同散列值的字符串,让程序速度减慢。在JavaSE 8 中,并发散列映射将桶组织为树,而不是列表,键类型实现了Comparable,从而可以保证性能为O(log(n))。
- 在JavaSE 8 中,ConcurrentHashMap将桶组织为树,而不是列表,键类型实现了“Comparable”,从而可以保证性能为O(log(n));
关于迭代器?
集合返回弱一致性( weakly consistent) 的迭代器。这意味着迭代器不一定能反映出它们被构造之后的所有的修改, 但是, 它们不会将同一个值返回两次,也不会拋出ConcurrentModificationException 异常。 与之形成对照的是, 集合如果在迭代器构造之后发生改变,java.util 包中的迭代器将抛出一个ConcurrentModificationException 异常。
- 集合如果在迭代器构造之后发生改变,java.util 包中的迭代器将抛出一个ConcurrentModificationException异常。
【???】
相关方法:
java.util.concurrent.ConcurrentLinkedQueue<E> | |
---|---|
ConcurrentLinkedQueue<E>() | 构造一个可以被多线程安全访问的无边界非阻塞的队列。 |
java.util.concurrent.ConcurrentLinkedQueue<E> | |
ConcurrentSkipListSet<E>() | 构造一个可以被多线程安全访问的有序集。第一个构造器要求元素实现Comparable接口。 |
ConcurrentSkipListSet<E>(Comparator<? super E> comp) | |
java.util.concurrent.ConcurrentHashMap<K, V> | |
java.util.concurrent.ConcurrentSkipListMap<K, V> | |
ConcurrentHashMap<K, V>() | 构造一个可以被多线程安全访问的散列映射表。
参数:
|
ConcurrentHashMap<K, V>(int initialCapacity) | |
ConcurrentHashMap<K, V>(int initialCapacity, float loadFactor, int concurrencyLevel) | |
ConcurrentSkipListMap<K, V>() | 构造一个可以被多线程安全访问的有序的映像表。第一个构造器要求键实现 Comparable 接口。 |
ConcurrentSkipListSet<K, V>(Comparator<? super K> comp) |
Map 条目的原子更新
- 线程安全:是指线程之间对同一个对象的访问不会出现同步错误;
- 原子性:是指操作过程(在cpu、寄存器中执行时)不会被打断;
映射条目的原子更新,不仅讨论“ConcurrentHashMap”,其他基本的“Map”亦是如此。
replace 方法更新
传统的做法是使用replace 操作:
- 它会以原子方式用一个新值替换原值,前提是之前没有其他线程把原值替换为其他值。
- (使用do-while:必须一直这么做,直到replace 成功)
do
{
oldValue = map.get(word);
newValue = oldValue = null ? 1 : oldValue + 1;
} while (!map.replace(word, oldValue, newValue));
使用 AtomicLong 类型的Value
- AtomicLong和LongAdder:数据类型类,其操作都是原子操作(都是“java.util.concurrent.atomic”包下的类);
- (区别于非原子操作的基本数据类型,如:“int”、“String”等)
public class AtomicLong extends Number implements java.io.Serializable {
public class LongAdder extends Striped64 implements Serializable {
使用“AtomicLong”代替基本数据类型,作为“ConcurrentHashMap<K,V>”中 V 的类型,则,对于数据的操作都是原子操作:
- 即构造“ConcurrentHashMap<String,AtomicLong>”;
- 或者在Java SE 8 中,还可以构造“ConcurrentHashMap<String,LongAdder>”;
如:
map.putlfAbsent(word, new LongAdder());
map.get(word).increment();
其他方法
Java SE 8 提供了一些可以更方便地完成原子更新的方法:
- compute:用于计算新值,需要提供一个键和一个计算新值的函数:
map.compute(word, (k, v) -> v = null ? 1: v + 1);
- computelfPresent:已经有原值的情况下计算新值;
- computelfAbsent:没有原值的情况下计算新值;
- ConcurrentHashMap 中不允许有null 值。有很多方法都使用null 值来指示映射中某个给定的键不存在。
ConcurrentHashMap 的批操作(search、reduce、forEach)
Java SE 8 为 ConcurrentHashMap 提供了批操作,即使有其他线程在处理映射,这些操作也能安全地执行:
- 批操作会遍历映射,处理遍历过程中找到的元素。
- 有3 种不同的操作:
- 搜索(search) :为每个键或值提供一个函数,直到函数生成一个非null 的结果。然后搜索终止,返回这个函数的结果。
- 归约(reduce) :组合所有键或值, 这里要使用所提供的一个累加函数。
- forEach:为所有键或值提供一个函数。
- 每个操作都有4 个版本:
- operationKeys:处理键。
- operatioriValues:处理值。
- operation:处理键和值。
- operatioriEntries:处理Map.Entry 对象。
- 对于上述各个操作,需要指定一个参数化阈值(parallelism threshold):如果映射包含的元素多于这个阈值,就会并行完成批操作。
- 如果希望批操作在一个线程中运行,可以使用阈值“Long.MAX_VALUE”;
- 如果希望用尽可能多的线程运行批操作,可以使用阈值“1”;
- 对于int、long 和double 输出还有相应的特殊化操作, 分别有后缀“Tolnt”、“ToLong”和“ToDouble”。
- 如:educeValuesToLong()、educeValuesTolnt()
示例
如,“search”的操作:
- U searchKeys(long threshold, BiFunction<? super K , ? extends U> f)
- U searchValues(long threshold, BiFunction<? super V, ? extends U> f)
- U search(long threshold, BiFunction<? super K, ? super V,? extends U> f)
- U searchEntries(long threshold, BiFunction<Map.Entry<K, V>, ? extends U> f)
如,“search”的使用:
“找出第一个出现次数超过1000 次的单词。需要搜索键和值”
String result = map.search(threshold, (k, v) -> v > 1000 ? k : null );
并发视图集
假设你想要的是一个大的线程安全的集而不是映射。并没有一个ConcurrentHashSet 类,而且你肯定不想自己创建这样一个类。当然,可以使用ConcurrentHashMap ( 包含“ 假” 值),不过这会得到一个映射而不是集, 而且不能应用Set 接口的操作。 静态newKeySet 方法会生成一个Set<K>, 这实际上是ConcurrentHashMap<K, Boolean〉的一个包装器。(所有映射值都为Boolean.TRUE, 不过因为只是要把它用作一个集, 所以并不关心具体的值。) Set<String> words = ConcurrentHashMap.<String>newKeySet() ; 当然, 如果原来有一个映射,keySet 方法可以生成这个映射的键集。这个集是可变的。如果删除这个集的元素,这个键(以及相应的值)会从映射中删除。不过, 不能向键集增加元素,因为没有相应的值可以增加。Java SE 8 为ConcurrentHashMap 增加了第二个keySet 方法,包含一个默认值,可以在为集增加元素时使用: Set<String> words = map.keySet (1L) ; words.add("]'ava”); 如果"Java” 在words 中不存在, 现在它会有一个值 1。
写数组的拷贝(CopyOnWriteArrayList、CopyOnWriteArraySet)
CopyOnWriteArrayList 和CopyOnWriteArraySet 是线程安全的集合,其中所有的修改线程对底层数组进行复制。
当构建一个迭代器的时候, 它包含一个对当前数组的引用。 如果数组后来被修改了,迭代器仍然引用旧数组, 但是,集合的数组已经被替换了。因而,旧的迭代器拥有一致的(可能过时的)视图,访问它无须任何同步开销。
- 即:迭代器对原集合进行访问,而copy一份数组进行修改操作,在执行完修改操作后将原来集合指向新的集合来完成修改操作;
- 加了写锁的ArrayList和ArraySet,锁住的是整个对象,但读操作可以并发执行
并行数组算法
在Java SE 8 中, Arrays 类提供了大量并行化操作。
如,静态Arrays.parallelSort 方法可以对一个基本类型值或对象的数组排序。
String contents = new String(Fi1es.readAl1Bytes(Paths.get("alice.txt")), StandardCharsets.UTFJ); // Read file into string
String[] words = contents.split("[\\P{L}]+"); // Split along nonletters
Arrays,parallelSort(words):
较早的线程安全集合
从Java 的初始版本开始,Vector 和Hashtable 类就提供了线程安全的动态数组和散列表的实现。
现在这些类被弃用了, 取而代之的是AnayList 和HashMap 类。这些类不是线程安全的,而集合库中提供了不同的机制。
任何集合类都可以通过使用同步包装器( synchronizationwrapper) 变成线程安全的:
List<E> synchArrayList = Col lections ,synch ronizedList (new ArrayList<E>()) ;
Map<K, V> synchHashMap = Col1ections.synchroni zedMap(new HashMap<K, V>());
结果集合的方法使用锁加以保护, 提供了线程安全访问。
- 最好使用“java.Util.Concurrent”包中定义的集合,不使用同步包装器中的。特别是,假如它们访问的是不同的桶,由于ConcurrentHashMap 已经精心地实现了, 多线程可以访问它而且不会彼此阻塞。有一个例外是经常被修改的数组列表。在那种情况下, 同步的ArrayList 可以胜过CopyOnWriteArrayList()。
相关方法:
java.util.Collections | |
---|---|
static <E> Collection<E> synchronizedCollection(Collection<E> c) | 构建集合视图, 该集合的方法是同步的。 |
static <E> List synchronizedList(List<E> c) | |
static <E> Set synchronizedSet(Set<E> c) | |
static <E> SortedSet synchronizedSortedSet(SortedSet<E> c) | |
static <K, V> Map<K, V> synchronizedMap(Map<K, V > c) | |
static <K, V> SortedMap<K, V> synchronizedSortedMap(SortedMap<K, V> c) |
异步运行接口(Callable 与 Future)
- Runnable:封装一个异步运行的任务,可以把它想象成为一个没有参数和返回值的异步方法。
package java.lang; @FunctionalInterface public interface Runnable { public abstract void run(); }
- Callable:与Runnable 类似,但是有返回值。
package java.util.concurrent; @FunctionalInterface public interface Callable<V> { V call() throws Exception; }
- 类型参数是返回值的类型。如
Callable<Integer>
表示一个最终返回Integer对象的异步计算。
- Future:保存异步计算的结果;
public interface Future<V> { V get() throws . .; V get(long timeout, TimeUnit unit) throws . .; void cancel (boolean maylnterrupt); boolean isCancelled(); boolean isDone(); }
- FutureTask:包装器,用于将“Callable”转换成“Future”和“Runnable”;
public class FutureTask<V> implements RunnableFuture<V> {
- FutureTask 实现了“RunnableFuture<V>”接口,而“RunnableFuture<V>”同时实现了“Runnable”和“Future<V>”接口;
示例:
Callable<Integer> myComputation = . . .;
FutureTask<Integer> task = new FutureTask<Integer>(myConiputation);
Thread t = new Thread(task); // it's a Runnable
t.start();
Integer result = task.get(); // it's a Future
相关方法:
java.util.concurrent.Callable<V> | |
---|---|
V call() | 运行一个将产生结果的任务。 |
java.util.concurrent.Future<V> | |
V get() | 获取结果,如果没有结果可用,则阻塞直到真正得到结果超过指定的时间为止。
|
V get(long time, TimeUnit unit) | |
boolean cancel(boolean mayInterrupt) | 尝试取消这一任务的运行。如果任务已经开始, 并且maylnterrupt 参数值为true, 它就会被中断。如果成功执行了取消操作, 返回true。 |
boolean isCancelled() | 如果任务在完成前被取消了, 则返回true。 |
boolean isDone() | 如果任务结束,无论是正常结束、中途取消或发生异常, 都返回true。 |
java.util.concurrent.FutureTask<V> | |
FutureTask(Callable<V> task) | 构造一个既是Future<V> 又是Runnable 的对象。 |
FutureTask(Runnable task, V result) |
执行器(Executor)
构建一个新的线程是有一定代价的, 因为涉及与操作系统的交互。创建大量线程会大大降低性能甚至使虚拟机崩溃。
所以,如果程序中创建了大量的生命期很短的线程,应该使用线程池( thread pool ):
一个线程池中包含许多准备运行的空闲线程。将Runnable 对象交给线程池, 就会有一个线程调用run 方法。当run 方法退出时, 线程不会死亡,而是在池中准备为下一个请求提供服务。 如果有一个会创建许多线程的算法, 应该使用一个线程数“ 固定的” 线程池以限制并发线程的总数。
执行器(Executor) 类有许多静态工厂方法用来构建线程池:
方法 | 描述 |
---|---|
newCachedThreadPool | 必要时创建新线程;空闲线程会被保留60 秒 |
newFixedThreadPool | 该池包含固定数量的线程;空闲线程会一直被保留 |
newScheduledThreadPool | 用于预定执行而构建的固定线程池, 替代java.util.Timer |
newSingleThreadExecutor | 只有一个线程的“池”, 该线程顺序执行每一个提交的任务(类似于 Swing 事件分配线程) |
newSingleThreadScheduledExecutor | 用于预定执行而构建的单线程“ 池” |
- (多线程:池“Pool”;单线程:执行器“Executor”)
- “Executor”与“Executors”
- “Executor”:执行器接口,定义了方法“execute”;
- “Executors”:执行器类,包括了上述方法;
线程池(thread pool)
- Executors 类用于创建线程池(区别于 Executor 接口);
“newCachedThreadPool”、“newFixedThreadPool”、“newSingleThreadExecutor”三个方法返回实现了“ExecutorService”接口的“ThreadPoolExecutor”类的对象。
- “ThreadPoolExecutor”的继承层级:
public interface Executor { public interface ExecutorService extends Executor { public abstract class AbstractExecutorService implements ExecutorService { public class ThreadPoolExecutor extends AbstractExecutorService {
使用连接池时应该做的事:
- 调用Executors 类中静态的方法newCachedThreadPool 或newFixedThreadPool。
- 调用submit 提交Runnable 或Callable 对象。
- 如果想要取消一个任务, 或如果提交Callable 对象, 那就要保存好返回的Future对象。
- 当不再提交任何任务时,调用shutdown。
- “shutdow”:启动该池的关闭序列,被关闭的执行器不再接受新的任务。(当所有任务都完成以后,线程池中的线程死亡)
- “shutdowNow”:取消尚未开始的所有任务并试图中断正在运行的线程。
- 可用下面方法将Runnable对象或Callable对象提交给ExecutorService:
- “Future<?> submit(Runnable task)”:返回 Future<?> 的 get 方法在完成的时候只是简单地返回 null;
- “Future<T> submit(Runnable task, T result)”:返回 Future 的 get 方法在完成的时候返回指定的 result 对象;
- “Future<T> submit(Callable<T> task)”:返回的 Future 对象将在计算结果准备好的时候得到它;
相关方法:
java.util.concurrent.Executors | |
---|---|
ExecutorService newCachedThreadPool() | 返回一个带缓存的线程池, 该池在必要的时候创建线程, 在线程空闲60 秒之后终止线程。 |
ExecutorService newFixedThreadPool(int threads) | 返回一个线程池, 该池中的线程数由参数指定。空闲线程会一直被保留。 |
ExecutorService newSingleThreadExecutor() | 返回一个执行器, 它在一个单个的线程中依次执行各个任务。 |
java.util.concurrent.ExecutorService | |
Future<T> submit(Callable<T> task) | 提交指定的任务去执行。 |
Future<T> submit(Runnable task, T result) | |
Future<?> submit(Runnable task) | |
void shutdown() | 关闭服务, 会先完成已经提交的任务而不再接收新的任务。 |
java.util.concurrent.ThreadPoolExecutor | |
int getLargestPoolSize() | 返回线程池在该执行器生命周期中的最大尺寸。 |
预定执行(ScheduledExecutorService)
- ScheduledExecutorService 接口具有为预定执行( Scheduled Execution ) 或重复执行任务而设计的方法。它是一种允许使用线程池机制的java.util.Timer 的泛化。
- 可以预定Runnable 或Callable 在初始的延迟之后只运行一次。也可以预定一个Runnable对象周期性地运行。
- Executors 类的“newScheduledThreadPool” 和“newSingleThreadScheduledExecutor” 方法将返回实现了“ScheduledExecutorService” 接口的对象。
- “ScheduledExecutorService”的继承层级:
public interface Executor { public interface ExecutorService extends Executor { public interface AbstractExecutorService extends ExecutorService { public interface ScheduledExecutorService extends ExecutorService { public class ThreadPoolExecutor extends AbstractExecutorService { public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
相关方法:
java.util.concurrent.Executors | |
---|---|
ScheduledExecutorService newScheduledThreadPool(int threads) | 返回一个线程池, 它使用给定的线程数来调度任务。 |
ScheduledExecutorService newSingleThreadScheduledExecutor( ) | 返回一个执行器, 它在一个单独线程中调度任务。 |
java.util.concurrent.ScheduledExecutorService | |
ScheduledFuture<V> schedule(Callable<V> task , long time, Timellnit unit ) | 预定在指定的时间之后执行任务。 |
ScheduledFuture<?> schedule(Runnable task , long time , TimeUnit unit ) | |
ScheduledFuture<?> scheduleAtFixedRate(Runnable task , long initialDelay , long period, TimeUnit unit ) | 预定在初始的延迟结束后, 周期性地运行给定的任务, 周期长度是period。 |
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task , long initialDelay , long delay, TimeUnit unit ) | 预定在初始的延迟结束后周期性地运行给定的任务, 在一次调用完成和下一次调用开始之间有长度为delay 的延迟。 |
控制任务组
使用执行器有更有实际意义的原因,控制一组相关任务:
- invokeAny 方法提交所有对象到一个Callable 对象的集合中, 并返回某个已经完成了的任务的结果。
- 无法知道返回的究竟是哪个任务的结果, 也许是最先完成的那个任务的结果。
- invokeAll 方法提交所有对象到一个Callable 对象的集合中,并返回一个Future 对象的列表,代表所有任务的解决方案。
- 如:
List<Callab1e<T>> tasks = . . .; List<Future<T>> results = executor.invokeAll(tasks); for (Future<T> result : results) processFurther(result.get());
可以用ExecutorCompletionService 来进行任务排列:
- 构建一个ExecutorCompletionService, 提交任务给完成服务(completion service)。
- 该服务管理Future 对象的阻塞队列,其中包含已经提交的任务的执行结果(当这些结果成为可用时)。
如:
ExecutorCompletionService<T> service = new ExecutorCompletionServiceo(executor):
for (Callable<T> task : tasks)
service,submit(task) ;
for (int i = 0; i < tasks.sizeO;i ++)
processFurther(servi ce.take().get());
相关方法:
java.util.concurrent.ExecutorService | |
---|---|
T invokeAny(Collection<Callable<T>> tasks ) | 执行给定的任务, 返回其中一个任务的结果。第二个方法若发生超时, 抛出一个Timeout Exception 异常。 |
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit ) | |
List<Future<T>> invokeAll(Collection<Callable<T>> tasks ) | 执行给定的任务, 返回所有任务的结果。第二个方法若发生超时, 拋出一个TimecmtException 异常。 |
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit ) | |
java.util.concurrent.ExecutorCompletionService<V> | |
ExecutorCompletionService(Executor e) | 构建一个执行器完成服务来收集给定执行器的结果。 |
Future<V> submit(Callable< V > task) | 提交一个任务给底层的执行器。 |
Future<V> submit(Runnable task, V result) | |
Future<V> take() | 移除下一个已完成的结果, 如果没有任何已完成的结果可用则阻塞。 |
Future<V> poll() | 移除下一个已完成的结果, 如果没有任何已完成结果可用则返回null。第二个方法将等待给定的时间。 |
Future<V> poll(long time , TimeUnit unit) |
Fork-Join 框架
Fork-Join 框架,(Java SE 7 中引入)用于完成计算密集型任务(任务可以分解为子任务,对每个处理器内核分别使用一个线程来运行子任务)
if (problemSize < threshold)
solve problem directly
else
{
break problem into subproblems
recursively solve each subproblem
combine the results
}
假设想统计一个数组中有多少个元素满足某个特定的属性。可以将这个数组一分为二,分别对这两部分进行统计, 再将结果相加。 要采用框架可用的一种方式完成这种递归计算, 需要提供一个扩展RecursiveTask() 的类(如果计算会生成一个类型为T 的结果)或者提供一个扩展RecursiveActicm 的类(如果不生成任何结果)。再覆盖compute 方法来生成并调用子任务, 然后合并其结果。 class Counter extends RecursiveTask<Integer> { ... protected Integer compute() { if (to - from < THRESHOLD) { solve problem directly } else { int mid = (from + to) / 2; Counter first = new Counter(va1ues, from, mid, filter) ; Counter second = new Counter(va1ues, mid, to, filter); invokeAll(first, second); return first.join() + second.join(); } } } 在这里, invokeAll 方法接收到很多任务并阻塞, 直到所有这些任务都已经完成。join 方法将生成结果。我们对每个子任务应用了join, 并返回其总和。 还有一个get 方法可以得到当前结果, 不过一般不太使用, 因为它可能抛出已检查异常, 而在compute 方法中不允许抛出这些异常。
在后台, fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。每个工作线程都有一个双端队列( deque ) 来完成任务。一个工作线程将子任务压人其双端队列的队头。(只有一个线程可以访问队头, 所以不需要加锁。)一个工作线程空闲时,它会从另一个双端队列的队尾“ 密取” 一个任务。由于大的子任务都在队尾, 这种密取很少出现。
可完成 Future
Java SE 8 的CompletableFuture 类提供了一种候选方法,用于组合任务(future),并指定执行的顺序。
【???】
同步器
java.util.concurrent 包含了用于管理相互合作的线程集的类:
- 用于控制线程执行顺序控制
类 | 作用 | 说明 |
---|---|---|
CyclicBarrier | 允许线程集等待直至其中预定数目的线程到达一个公共障栅(barrier),然后可以选择执行一个处理障栅的动作 | 当大量的线程需要在它们的结果可用之前完成时 |
Phaser | 类似于循环障栅, 不过有一个可变的计数 | Java SE 7 中引人 |
CountDownLatch | 允许线程集等待直到计数器减为0 | 当一个或多个线程需要等待直到指定数目的事件发生 |
Exchanger | 允许两个线程在要交换的对象准备好时交换对象 | 当两个线程工作在同一数据结构的两个实例上的时候, 一个向实例添加数据而另一个从实例清除数据 |
Semaphore | 允许线程集等待直到被允许继续运行为止 | 限制访问资源的线程总数。如果许可数是1,常常阻塞线程直到另一个线程给出许可为止 |
SynchronousQueue | 允许一个线程把对象交给另一个线程 | 在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时 |
- 这些机制具有为线程之间的共用集结点模式(common rendezvous patterns ) 提供的“ 预置功能”( canned functionality )。【???】
信号量(Semaphore)
Semaphore翻译成字面意思为信号量,Semaphore可以控制同时访问资源的线程个数,通过“acquire()”获取一个许可,如果没有就等待,而“release()”释放一个许可。
方法
- Semaphore类位于java.util.concurrent包下
- 提供了2个构造器:
public Semaphore(int permits){ //参数permits表示许可数目,即同时可以允许多少线程进行访问 sync = new NonfairSync(permits); } public Semaphore(int permits,boolean fair){ //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 sync = (fair)?new FairSync(permits):new NonfairSync(permits); }
- 重要的方法:
// 阻塞方法 public void acquire() throws InterruptedException{ }; //获取一个许可,若无许可能够获得,则会一直等待,直到获得许可 public void acquire(int permits) throws InterruptedException{ }; //尝试获取permits个许可 public void release() { }; //释放许可 public void release(int permits) { }; //释放permits个许可 public int availablePermits() { }; //得到可用的许可数目 // 不阻塞,理解得到结果 public boolean tryAcquire(){ }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(long timeout,TimeUnit unit) throws InterruptedException{ }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则立即返回false public boolean tryAcquire(int permits){ }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(int permits,long timeout,TimeUnit unit) throws InterruptedException{ }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则立即返回false
示例
package code;
import java.util.concurrent.Semaphore;
public class tesy{
public static void main(String[] args){
int N = 8;
Semaphore semaphore = new Semaphore(5); //机器数目
for(int i=0;i<N;i++)
new Worker(i,semaphore).start();
}
static class Worker extends Thread{
private int i;
private Semaphore semaphore;
public Worker(int i,Semaphore semaphore){
this.i = i;
this.semaphore = semaphore;
}
public void run(){
try{
semaphore.acquire();
System.out.println("工人"+this.i+"占用一个机器");
Thread.sleep(1000);
System.out.println("工人"+this.i+"释放出机器");
semaphore.release();
}catch(Exception e){
e.printStackTrace();
}
}
}
}
执行结果为:
工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人3占用一个机器在生产...
工人4占用一个机器在生产...
工人1释放出机器
工人0释放出机器
工人4释放出机器
工人5占用一个机器在生产...
工人2释放出机器
工人7占用一个机器在生产...
工人3释放出机器
工人6占用一个机器在生产...
工人5释放出机器
工人6释放出机器
工人7释放出机器
倒计时门栓(CountDownLatch)
CountDownLatch 可以实现实现计数等待,主要用于某个线程等待其他几个线程。
- CountDownLatch 是一次性的,一旦计数值为0,就不能再用了;
方法
- CyclicBarrier类位于java.util.concurrent包下
- CountDownLatch类只提供了一个构造器:
public CountDownLatch(int count){ };//参数count为计数值
- CountDownLatch类中最重要的方法:
public void await() thows InterruptedException{ }; //调用await()方法的线程会被挂起,它会等待直到count为0时才继续执行 public boolean await( long timeout,TimeUnit unit) throws InterruptedException{ }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public void countDown(){ }; //将count值间1
示例
如:可以设置门栓,让一个线程等待计数器为0,来实现该线程对于其他线程的等待:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void main(String[] args) {
// 创建计数器,初始化为2
final CountDownLatch latch = new CountDownLatch(2);
new Thread(() -> {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown(); // 减一
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");
Thread.sleep(3000);
System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
System.out.println("等待2个子线程执行完毕...");
// 阻塞
latch.await();
System.out.println("2个子线程已经执行完毕");
System.out.println("继续执行主线程");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
显示结果为:
执行的结果:
子线程Thread-0正在执行
等待2个子线程执行完毕...
子线程Thread-1正在执行
子线程Thread-0执行完毕
子线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程
障栅(CyclicBarrier)
当一个线程完成了它的那部分任务后,我们让它运行到障栅处。一旦所有的线程都到达了这个障栅,障栅就撤销, 线程就可以继续运行。
CyclicBarrier 类实现了一个集结点( rendezvous) 称为障栅( barrier ),通过它可以实现让一组线程等待至某个状态之后再全部同时执行:
- 当调用await()方法之后,线程就被拦在barrier外了;
- 当所有等待线程都被释放以后,CyclicBarrier可以被重用(不同于“CountDownLatch”的一次性);
方法
- CyclicBarrier类位于java.util.concurrent包下
- CyclicBarrier提供构造器:
public CyclicBarrier(int parties, Runnable barrierAction){ ... } public CyclicBarrier(int parties){ ... }
- CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供构造器:
public int await() throws InterruptedException,BrokenBarrierException{ }; public int await(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException{ };
- 第一个版本比较常用,用来挂起当前线程,直至所有线程都到达brrier状态再同时执行后续任务;
- 第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。
示例
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
// 进行额外操作的 CyclicBarrier
/*CyclicBarrier barrier = new CyclicBarrier(N, new Runnable(){
public void run(){
System.out.println("当前线程"+Thread.currentThread().getName());
}
});*/
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}
// 重用 CyclicBarrier
/*System.out.println("CyclicBarrier重用");
for(int i=0;i<N;i++){
new Writer(barrier).start();
}*/
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}
执行结果为:
- CyclicBarrier:
线程Thread-0正在写入数据... 线程Thread-1正在写入数据... 线程Thread-2正在写入数据... 线程Thread-3正在写入数据... 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-2写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务...
- 进行额外操作的 CyclicBarrier:
线程Thread-0正在写入数据... 线程Thread-1正在写入数据... 线程Thread-2正在写入数据... 线程Thread-3正在写入数据... 当前线程Thread-1 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-2写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务...
- 重用 CyclicBarrier:
线程Thread-0正在写入数据... 线程Thread-1正在写入数据... 线程Thread-2正在写入数据... 线程Thread-3正在写入数据... 线程Thread-0写入数据完毕,等待其他线程写入完毕 线程Thread-1写入数据完毕,等待其他线程写入完毕 线程Thread-3写入数据完毕,等待其他线程写入完毕 线程Thread-2写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... CyclicBarrier重用 线程Thread-4正在写入数据... 线程Thread-5正在写入数据... 线程Thread-7正在写入数据... 线程Thread-6正在写入数据... 线程Thread-7写入数据完毕,等待其他线程写入完毕 线程Thread-6写入数据完毕,等待其他线程写入完毕 线程Thread-4写入数据完毕,等待其他线程写入完毕 线程Thread-5写入数据完毕,等待其他线程写入完毕 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务... 所有线程写入完毕,继续处理其他任务...
交换器(Exchanger)
当两个线程在同一个数据缓冲区的两个实例上工作的时候,就可以使用交换器( Exchanger) 典型的情况是,一个线程向缓冲区填人数据,另一个线程消耗这些数据。当它们都完成以后,相互交换缓冲区。
- 用于线程之间(数据缓冲区的)数据交换;
同步队列(SynchronousQueue)
同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用SynchronousQueue的put 方法时,它会阻塞直到另一个线程调用take 方法为止,反之亦然。与Exchanger 的情况不同,数据仅仅沿一个方向传递,从生产者到消费者。 即使SynchronousQueue 类实现了BlockingQueue 接口, 概念上讲, 它依然不是一个队列。它没有包含任何元素,它的size 方法总是返回0。
- 用于将生产者与消费者线程配对;
【???】