查看“核心技术:并发”的源代码
←
核心技术:并发
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:JavaCore]] == 什么是线程 == 一个程序同时执行多个任务,每一个任务称为一个线程(thread), 它是线程控制的简称。 * 进程与线程有的区别,在于每个进程拥有自己的一整套变量, 而线程则共享数据。 *: 共享变量使线程之间的通信比进程之间的通信更有效、更容易。 === 使用线程 === 实现线程的两种方式: # 实现'''Runnable'''接口,利用接口构造Thread对象;(Runnable 是一个函数式接口,可以用lambda 表达式) #: <syntaxhighlight lang="java"> Runnable r = () -> { /*task code*/ }; Thread t = new Thread(r); </syntaxhighlight> # 继承'''Thread'''类,构造一个子类的对象; #: <syntaxhighlight lang="java"> class MyThread extends Thread { public void run() { //task code } } </syntaxhighlight> * 不要调用Thread类或Runnable对象的run方法,而应该调用Thread.start方法。 *: 直接调用run 方法,只会执行同一个线程中的任务,而不会启动新线程。 * 如果有很多任务, 要为每个任务创建一个独立的线程所付出的代价太大了。可以使用线程池来解决这个问题 ==== 相关方法 ==== ===== java.Iang.Thread ===== * Thread( Runnable target ) *: 构造一个新线程, 用于调用给定目标的nm() 方法。 * void start( ) *: 启动这个线程, 将引发调用mn() 方法。这个方法将立即返回, 并且新线程将并发运行。 * void run( ) *: 调用关联 Runnable 的run 方法。 * static void sleep(long minis) *: 休眠给定的毫秒数。参数:millis 休眠的毫秒数 ===== java.lang.Runnable ===== * void run( ) *: 必须覆盖这个方法, 并在这个方法中提供所要执行的任务指令。 == 中断线程 == 当线程的run 方法执行方法体中最后一条语句后,并经由执行return 语句返冋时,或者出现了在方法中没有捕获的异常时,线程将终止。<br> * 在Java 的早期版本中,还有一个stop方法,其他线程可以调用它终止线程。 *: 因为,stop会瞬间强行停止一个线程,<s>且该线程持有的锁并不能释放</s>;所以,这个方法现在已经被弃用了。 * '''[[Java中断及中断响应]]''' === 相关方法 === ===== java.Iang.Thread ===== * void interrupt() *: 向线程发送中断请求。线程的中断状态将被设置为true。如果目前该线程被一个sleep调用阻塞,那么, InterruptedException 异常被抛出。 * static boolean interrupted() *: 测试当前线程(即正在执行这一命令的线程)是否被中断。注意,这是一个静态方法。这一调用会产生副作用—它将当前线程的'''中断状态重置为false'''。 * boolean islnterrupted() *: 测试线程是否被终止。不像静态的中断方法,这一调用不改变线程的中断状态。 * static Thread currentThread() *: 返回代表当前执行线程的Thread 对象。 == 线程状态 == [[File:线程状态.png|500px]] 线程可以有如下6 种状态: # New ( 新创建) # Runnable (可运行) # Blocked ( 被阻塞) # Waiting ( 等待) # Timed waiting (计时等待) # Terminated ( 被终止) * 可调用“getState”方法获取一个线程的当前状态; [[File:Java线程状态图.jpeg|800px]] === 新创建线程 === 用 new 操作符创建一个新线程,未调用 start 方法,该线程还没有开始运行,即 New 状态 === 可运行线程 === 一旦调用 start 方法,则线程处于 runnable 状态。 * (Java 的规范说明没有将它作为一个单独状态:正在运行中的线程仍然处于可运行状态。) 结合线程状态图,理解为:'''“可运行” = “运行” + “就绪”'''; === 被阻塞线程和等待线程 === 当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且消耗最少的资源。直到线程调度器重新激活它。 * (看到里有点混,于是'''[[Java的阻塞、等待及相关方法]]''') ==== 阻塞(blocked)==== 阻塞:当一个线程'''请求锁''',而该资源被其他线程持有时,该线程'''被置为阻塞状态'''。 * 请求内部的对象锁(非java.util.concurrent库中的锁)。 ==== 等待(waiting)==== 等待:当线程'''等待另一个线程通知调度器一个条件'''时,该线程'''主动进入等待状态'''。<br/> 如: # Object.wait # Thread.join # java.util.concurrent 库中的Lock 或Condition ==== 计时等待(timed waiting)==== 计时等待状态将一直保持到超时期满或者接收到适当的通知。<br/> 如: # Object.wait(long millis) # Thread.join(long millis) # java.util.concurrent 库中的Lock(long millis) 或Condition(long millis) === 被终止的线程(Terminated) === 线程因如下两个原因之一而被终止: # 因为run 方法正常退出而自然死亡。 # 因为一个没有捕获的异常终止了nm 方法而意外死亡。 * 不应该使用“stop”方法来终止线程!(虽然能做到) === 相关方法 === ===== java.iang.Thread ===== * void join() *: 等待指定线程的终止。【在当前线程调用“s.join();”,则必须等待线程s执行完毕,当前线程才能继续执行】 * void join(long mi11is) *: 等待指定的线程死亡或者经过指定的毫秒数。 * Thread.State getState() *: 得到这一线程的状态;NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING 或 TERMINATED 之一。 * void stop() *: 停止该线程。这一方法已过时。 * void suspend() *: 暂停这一线程的执行。这一方法已过时。 * void resume() *: 恢复线程。这一方法仅仅在调用suspend() 之后调用。这一方法已过时。 == 线程属性 == 线程的各种属性,如:线程优先级、守护线程、线程组以及处理未捕获异常的处理器。 <syntaxhighlight lang="java"> private int priority; ... private boolean daemon = false; ... private ThreadGroup group; public final static int MIN_PRIORITY = 1; public final static int NORM_PRIORITY = 5; public final static int MAX_PRIORITY = 10; ... // null unless explicitly set private volatile UncaughtExceptionHandler uncaughtExceptionHandler; // null unless explicitly set private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler; </syntaxhighlight> === 线程优先级(Priority) === * 默认情况下,线程会继承它的父线程的优先级;(可以在 init 方法中发现,同时还会继承 “group”、“daemon”) * 可以用setPriority 方法提高或降低任何一个线程的优先级。 * 不要将程序构建为功能的正确性依赖于优先级。 <pre> 如果确实要使用优先级, 应该避免初学者常犯的一个错误。如果有几个高优先级的线程没有进入非活动状态, 低优先级的线程可能永远也不能执行。每当调度器决定运行一个新线程时, 首先会在具有高优先级的线程中进行选择, 尽管这样会使低优先级的线程完全饿死。 </pre> 【???不用动态优先级吗,还是和底层实现有关?】 ==== 相关方法 ==== ===== java.lang.Thread ===== * void setPriority(int newPriority) *: 设置线程的优先级。优先级必须在Thread.MIN_PRIORITY 与Thread.MAX_PRIORITY之间。一般使用Thread.NORMJ»RIORITY 优先级。 * static int MIN_PRIORITY *: 线程的最小优先级。最小优先级的值为1。 * static int N0RM_PRI0RITY *: 线程的默认优先级。默认优先级为5。 * static int MAX—PRIORITY *: 线程的最高优先级。最高优先级的值为10。 * static void yield( ) *: 导致当前执行线程处于让步状态。如果有其他的可运行线程具有至少与此线程同样高的优先级,那么这些线程接下来会被调度。注意,这是一个静态方法。【让出CPU】 === 守护线程(Daemon) === * 守护线程的唯一用途是为其他线程提供服务,当只剩下守护线程时,虚拟机就退出了。 * 可以通过调用“t .setDaemon(true);”将线程转换为守护线程(daemon thread)。 * 守护线程应该永远不去访问固有资源, 如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。 ==== 相关方法 ==== ===== java.lang.Thread ===== * void setDaemon( boolean isDaemon ) *: 标识该线程为守护线程或用户线程。这一方法必须在线程启动之前调用。 === 线程组(ThreadGroup) === <pre> 线程组是一个可以统一管理的线程集合。默认情况下,创建的所有线程属于相同的线程组,但是,也可能会建立其他的组。现在引入了更好的特性用于线程集合的操作,所以建议不要在自己的程序中使用线程组。 </pre> ==== 相关方法 ==== ===== java.lang.ThreadGroup ===== * void UncaughtException( Thread t, Throwable e) *: 如果有父线程组,调用父线程组的这一方法; 或者,如果Thread 类有默认处理器,调用该处理器,否则,输出栈轨迹到标准错误流上(但是, 如果e 是一个ThreadDeath对象,栈轨迹是被禁用的。ThreadDeath 对象由stop 方法产生, 而该方法已经过时)。 === 未捕获异常处理器(uncaughtExceptionHandler) === * 线程的run 方法不能抛出任何受查异常。非受査异常会导致线程终止,在线程死亡之前,异常被传递到一个用于未捕获异常的处理器。 * 该处理器必须属于一个实现'''Thread.UncaughtExceptionHandler'''接口的类,这个接口只有—个方法“void uncaughtException(Thread t, Throwable e);” ==== 相关方法 ==== ===== java.lang.Thread ===== * static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler handler ) * static Thread.UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() *: 设置或获取未捕获异常的默认处理器。 * void setUncaughtExceptionHandler( Thread.UncaughtExceptionHandlerhandler ) * Thread.UncaughtExceptionHandler getUncaughtExceptionHandler( ) *: 设置或获取未捕获异常的处理器。如果没有安装处理器, 则将线程组对象作为处理器。 ===== java.Iang.Thread.UncaughtExceptionHandler ===== * void UncaughtException(Thread t , Throwable e) *: 当一个线程因未捕获异常而终止, 按规定要将客户报告记录到日志中。 *: 参数:t 由于未捕获异常而终止的线程;e 未捕获的异常对象; == 同步 == 多个线程共享同一数据时,线程发出一个方法调用,在没有得到结果之前,这个调用就不返回,同时其它的线程也不能调用这个方法。 [[File:非同步线程与同步线程的比较.png|400px]] === 锁对象 === 有两种机制防止'''代码块'''受并发访问的干扰: # Java 语言提供一个'''synchronized''' 关键字 # 并且Java SE 5.0 引入了'''ReentrantLock''' 类。 * 把解锁操作括在finally 子句之内是至关重要的。如果在临界区的代码抛出异常,锁必须被释放。否则,其他线程将永远阻塞。 * 如果使用锁, 就不能使用带资源的try 语句。 * 锁是可重入的, 因为线程可以重复地获得已经持有的锁。锁保持一个持有计数( holdcount ) 来跟踪对lock 方法的嵌套调用。线程在每一次调用lock 都要调用unlock 来释放锁。由于这一特性, 被一个锁保护的代码可以调用另一个使用相同的锁的方法。 ==== 示例: ==== <syntaxhighlight lang="java"> public class Bank { private Lock bankLock = new ReentrantLock0;// ReentrantLock implements the Lock interface ... public void transfer(int from, int to, int amount) { bankLock.lock(); try { System.out.print(Thread.currentThread0); accounts[from] -= amount; System.out.printf(" X10.2f from %A to Xd", amount, from, to); accounts[to] += amount; System.out.printf(" Total Balance: X10.2fXn", getTotalBalanceO); } finally { banklock.unlockO; } } } </syntaxhighlight> 如上: # transfer 方法调用getTotalBalance 方法, 这也会封锁bankLock 对象,此时bankLock对象的持有计数为2。 # 当getTotalBalance 方法退出的时候, 持有计数变回1。 # 当transfer 方法退出的时候, 持有计数变为0。线程释放锁。 ==== 相关方法 ==== ===== java.util.concurrent.locks.Lock ===== * void lock( ) *: 获取这个锁;如果锁同时被另一个线程拥有则发生阻塞。 * void unlock( ) *: 释放这个锁。 ===== java,util.concurrent.locks.ReentrantLock ===== * ReentrantLock( ) *: 构建一个可以被用来保护临界区的可重入锁。 * ReentrantLock(boo1ean fair ) *: 构建一个带有公平策略的锁。一个公平锁偏爱等待时间最长的线程。但是,这一公平的保证将大大降低性能。所以, 默认情况下, 锁没有被强制为公平的。 === 条件对象(Condition) === 条件对象用于管理那些已经获得了一个锁但是却不能做有用工作的线程。 ==== 使用 ==== # 一个锁对象可以有一个或多个相关的条件对象,用 '''newCondition''' 方法获得一个条件对象。 #: <syntaxhighlight lang="java"> class Bank { private Condition sufficientFunds; public BankO { sufficientFunds = bankLock.newConditionO; } } </syntaxhighlight> # 当条件不满足时,调用'''await''': #: <syntaxhighlight lang="java"> sufficientFunds.await(); </syntaxhighlight> #: 当前线程现在被阻塞了,并放弃了锁。 #: 一旦一个线程调用await方法, 它进人该条件的等待集。当锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程调用同一条件上的signalAll 方法时为止。 # 重新激活因为这一条件而等待的所有线程'''signalAll''': #: <syntaxhighlight lang="java"> sufficientFunds,signalAll(); </syntaxhighlight> * 至关重要的是最终需要某个其他线程调用signalAll 方法 * 调用signalAll 不会立即激活一个等待线程。它仅仅解除等待线程的阻塞, 以便这些线程可以在当前线程退出同步方法之后, 通过竞争实现对对象的访问。 ==== 示例: ==== <syntaxhighlight lang="java"> public void transfer(int from, int to, int amount) { bankLock.lockO; try { while (accounts[from] < amount) sufficientFunds.awaitO; // transfer funds ... sufficientFunds.signalAll(); } finally { bankLock.unlock0; } } </syntaxhighlight> === synchronized 关键字 === 有关锁和条件的关键之处: * 锁用来保护代码片段,任何时刻只能有一个线程执行被保护的代码。 * 锁可以管理试图进入被保护代码段的线程。 * 锁可以拥有一个或多个相关的条件对象。 * 每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程。 ==== synchronized 与 wait、notifyAll ==== <pre> 从1.0 版开始,Java中的每一个对象都有一个内部锁。如果一个方法用 synchronized 关键字声明,那么对象的锁将保护整个方法。也就是说,要调用该方法, 线程必须获得内部的对象锁。 内部对象锁只有一个相关条件。wait 方法添加一个线程到等待集中,notifyAll /notify 方法解除等待线程的阻塞状态。 </pre> * '''每一个对象有一个内部锁, 并且该锁有一个内部条件。''' 即: # <syntaxhighlight lang="java"> public synchronized void methodO { method body } </syntaxhighlight> #: 等价于 #: <syntaxhighlight lang="java"> public void methodQ { this.intrinsidock.1ock(); try { method body } finally { this.intrinsicLock.unlockO; } } </syntaxhighlight> # wait 、notityAll等价于 #: <syntaxhighlight lang="java"> intrinsicCondition.await(); intrinsicCondition.signalAll(); </syntaxhighlight> ===== 示例 ===== <syntaxhighlight lang="java" highlight="4,7,10"> class Bank { private double[] accounts; public synchronized void transfer(int from,int to, int amount) throws InterruptedException { while (accounts[from] < amount) wait(); // wait on intrinsic object lock's single condition accounts[from] -= amount ; accounts[to] += amount ; notifyAll(); // notify all threads waiting on the condition } public synchronized double getTotalBalanceO { . . . } } </syntaxhighlight> ==== Synchronized 使用 ==== * 将静态方法声明为 synchronized 时,该方法获得相关的类对象的内部锁,没有其他线程可以调用同一个类的这个或任何其他的同步静态方法。 *: 例如, 如果Bank 类有一个静态同步的方法,那么当该方法被调用时,Bank.class对象的锁被锁住。 内部锁和条件存在一些局限。包括: # 不能中断一个正在试图获得锁的线程。 # 试图获得锁时不能设定超时。 # 每个锁仅有单一的条件, 可能是不够的。 * 最好使用JUC框架('''java.util.concurrent''')提供的锁,而不是“Lock/Condition”和“synchronized”。 ==== 相关方法 ==== ===== java.lang.Object ===== * void notifyAl1 () *: 解除那些在该对象上调用wait 方法的线程的阻塞状态。该方法只能在同步方法或同步块内部调用。如果当前线程不是对象锁的持有者,该方法拋出一个IllegalMonitorStateException异常。 * void notify( ) *: 随机选择一个在该对象上调用wait 方法的线程, 解除其阻塞状态。该方法只能在一个同步方法或同步块中调用。如果当前线程不是对象锁的持有者, 该方法抛出一个IllegalMonitorStateException 异常。 * void wait( ) *: 导致线程进人等待状态直到它被通知。该方法只能在一个同步方法中调用。如果当前线程不是对象锁的持有者,该方法拋出一个IllegalMonitorStateException 异常。 * void wait( long mi11is) * void wait( long mi 11Is, int nanos ) *: 导致线程进入等待状态直到它被通知或者经过指定的时间。这些方法只能在一个同步方法中调用。如果当前线程不是对象锁的持有者该方法拋出一个IllegalMonitorStateException异常。 *: 参数: millis 毫秒数;nanos 纳秒数,<1 000 000 === 同步阻塞 === 除了调用同步方法,还有另一种机制可以获得锁:通过进入一个同步阻塞。<br/> 如: <syntaxhighlight lang="java" highlight="4,8"> public class Bank { private doublet[] accounts; private Object lock = new Object(); ... public void transfer(int from, int to, int amount) { synchronized (lock) // an ad-hoc lock { accounts[from] -= amount; accounts[to] += amount; } System.out.print1n( ... ); } } </syntaxhighlight> * lock 对象被创建仅仅是用来使用每个Java 对象持有的锁。 === 监视器概念(monitor)=== 监视器可以在不需要程序员考虑如何加锁的情况下,就可以保证多线程的安全性。<br/> 监视器具有如下特性: # 监视器是只包含私有域的类。 # 每个监视器类的对象有一个相关的锁。 # 使用该锁对所有的方法进行加锁。换句话说,如果客户端调用obj.methood() , 那么obj对象的锁是在方法调用开始时自动获得,并且当方法返回时自动释放该锁。因为所有的域是私有的,这样的安排可以确保一个线程在对对象操作时,没有其他线程能访问该域。 # 该锁可以有任意多个相关条件。 Java中以不是很精确的方式采用了监视器概念: # 用synchronized 关键字声明方法 # 通过调用wait/notifyAU/notify 来访问条件变量 Java 对象不同于监视器, 从而使得线程的安全性下降: # 域不要求必须是private。 # 方法不要求必须是synchronized。 # 内部锁对客户是可用的。【?】 === Volatile 域 === volatile 关键字为实例域的同步访问提供了一种'''免锁机制''': :如果声明一个域为volatile,那么编译器和虚拟机就知道该域是可能被另一个线程并发更新的。 * volatile 不保证原子性 === 原子性 === '''java.util.concurrent.atomic''' 包中有很多类使用了很高效的'''机器级指令'''(而不是使用锁) 来保证其他操作的原子性(操作不会中断)。 === 死锁(deadlock) === # 互斥访问:一个资源每次只能被一个线程使用。 # 请求与保持:一个线程因请求资源而阻塞时,对已获得的资源保持不放。 # 不可抢占:线程已获得的资源,在未使用完之前,不能强行剥夺。 # 循环等待:若干线程之间形成一种头尾相接的循环等待资源关系。 相关: * 银行家算法 === 线程局部变量(ThreadLocal) === 使用'''ThreadLocal'''辅助类为各个线程提供各自的实例。 如: <syntaxhighlight lang="java"> public static final ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd")); String dateStamp = dateFormat.get().format(new Date()); </syntaxhighlight> * 在一个给定线程中首次调用get时,会调用 initialValue 方法;此后,get 方法会返回属于当前线程的那个实例。 ==== 相关方法 ==== ===== java.lang.ThreadLocal<T> ===== * T get( ) *: 得到这个线程的当前值。如果是首次调用get, 会调用initialize 来得到这个值。 * protected initialize( ) *: 应覆盖这个方法来提供一个初始值。默认情况下,这个方法返回mill。 * void set(T t) *: 为这个线程设置一个新值。 * void remove( ) *: 删除对应这个线程的值。 * <nowiki>static <S> ThreadLocal <S> withlnitial ( Supplier< ? extends S> supplier )</nowiki> *: 创建一个线程局部变量, 其初始值通过调用给定的supplier 生成。 ===== java.util.concurrent.ThreadLocalRandom ===== * static ThreadLocalRandom current( ) *: 返回特定于当前线程的Random 类实例。 === 锁测试域超时 === 应该更加谨慎地申请锁,以避免调用lock方法可能发生的阻塞。 # lock:一直申请锁,操作无法取消,不能被中断; # tryLock:成功获得锁后返回true, 否则, 立即返回false, 且线程可以立即离开去做其他事情;【响应中断】 # lockInterruptibly:(相当于一个无线超时的tryLock)【响应中断】 ==== 相关方法 ==== ===== java.util.concurrent.locks.Lock ===== * boolean tryLock() *: 尝试获得锁而没有发生阻塞;如果成功返回真。这个方法会抢夺可用的锁,即使该锁有公平加锁策略,即便其他线程已经等待很久也是如此。 * boolean tryLock(long time, TimeUnit unit) *: 尝试获得锁,阻塞时间不会超过给定的值;如果成功返回true。 * void lockInterruptibly() *: 获得锁,但是会不确定地发生阻塞。如果线程被中断,抛出一个InterruptedException异常。 ===== java.util.concurrent.locks.Condition ===== * boolean await( 1ong time , TimeUnit unit ) *: 进人该条件的等待集,直到线程从等待集中移出或等待了指定的时间之后才解除阻塞。如果因为等待时间到了而返回就返回false,否则返回true。 * void awaitUninterruptibly( ) *: 进人该条件的等待集,直到线程从等待集移出才解除阻塞。如果线程被中断,该方法不会抛出InterruptedException 异常。 === 读/写锁(ReenterantReadWriteLock)=== “java.util.concurrent.locks” 包定义了两个锁类:“ReentrantLock”类和'''ReentrantReadWriteLock'''类。 * 二者实现了不同的类: *: <syntaxhighlight lang="java"> public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { public class ReentrantLock implements Lock, java.io.Serializable { </syntaxhighlight> * ReentrantReadWriteLock 允许读线程共享访问,写线程互斥; 使用步骤: # 构造一个ReentrantReadWriteLock 对象: #: <syntaxhighlight lang="java"> private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(): </syntaxhighlight> # 抽取读锁和写锁: #: <syntaxhighlight lang="java"> private Lock readLock = rwl.readLock(); private Lock writeLock = rwl.writeLock(); </syntaxhighlight> # 对所有的获取方法加读锁: #: <syntaxhighlight lang="java"> public double getTotalBalance() { readLock.lock(); try { . . . } finally { readLock.unlock(); } } </syntaxhighlight> # 对所有的修改方法加写锁: #: <syntaxhighlight lang="java"> public void transfer(. . .) { writeLock.lock(); try { . . . } finally { writeLock.unlock(); } } </syntaxhighlight> ==== 相关方法 ==== ===== java.util.concurrent.locks.ReentrantReadWriteLock ===== * Lock readLock( ) *: 得到一个可以被多个读操作共用的读锁, 但会排斥所有写操作。 * Lock writeLock( ) *: 得到一个写锁, 排斥所有其他的读操作和写操作。 === 为什么弃用 stop 和 suspend 方法 === stop、suspend 和resume 方法已经弃用:stop 方法天生就不安全,经验证明suspend 方法(resume 用于suspend之后恢复)会经常导致死锁。 ==== stop ==== 该方法终止所有未结束的方法,包括run 方法。当线程被终止,立即释放被它锁住的所有对象的锁【而并非导致锁不能被释放】。这会'''导致对象处于不一致的状态'''。 : 例如,假定TransferThread在从一个账户向另一个账户转账的过程中被终止,钱款已经转出,却没有转人目标账户,现在银行对象就被破坏了。因为锁已经被释放,这种破坏会被其他尚未停止的线程观察到。 见源码注释: <pre> * @deprecated This method is inherently unsafe. Stopping a thread with * Thread.stop causes it to unlock all of the monitors that it * has locked (as a natural consequence of the unchecked * <code>ThreadDeath</code> exception propagating up the stack). If * any of the objects previously protected by these monitors were in * an inconsistent state, the damaged objects become visible to * other threads, potentially resulting in arbitrary behavior. Many * uses of <code>stop</code> should be replaced by code that simply * modifies some variable to indicate that the target thread should * stop running. The target thread should check this variable * regularly, and return from its run method in an orderly fashion * if the variable indicates that it is to stop running. If the * target thread waits for long periods (on a condition variable, * for example), the <code>interrupt</code> method should be used to * interrupt the wait. </pre> ==== suspend ==== 与stop 不同,suspend 不会破坏对象。但是,如果'''用suspend 挂起一个持有一个锁的线程,那么该锁在恢复之前是不可用的'''。 : 如果调用suspend 方法的线程试图获得同一个锁,那么程序死锁: == 阻塞队列(BlockingQueue) == 阻塞队列(blocking queue):当试图向队列添加元素而队列已满,或是想从队列移出元素而队列为空的时候,其会导致线程阻塞。 === 阻塞队列方法 === [[File:阻塞队列方法.png|800px]] <br/> 以队列满或空时它们的响应方式,阻塞队列方法分为3 类: # 抛出异常:“add”、“element”、“remove”; # 阻塞:“put”、“take”; # 返回false或null为提示:“offer”、“peek”、“poll”;(多线程中应采用) === 阻塞队列类型 === # '''LinkedBlockingQueue''':'''双端'''阻塞队列,默认容量没有边界(可指定容量); # '''ArrayBlockingQueue''':在构造时需要指定容量,并且有一个可选的参数来指定是否需要公平性; #: 若设置了公平参数, 则那么等待了最长时间的线程会优先得到处理。(通常,公平性会降低性能, 只有在确实非常需要时才使用它。) #: # '''PriorityBlockingQueue''':是一个'''带优先级的队列,而不是先进先出队列'''。元素按照它们的优先级顺序被移出; #: 该队列是没有容量上限,但是,如果队列是空的,取元素的操作会阻塞。(有关优先级队列的详细内容参看第9 章。) # '''DelayQueue''':'''延迟'''阻塞队列; #: DelayQueue 包含实现Delayed 接口的对象(“Delayed”:<syntaxhighlight lang="java" inline>interface Delayed extends Comparable<Delayed></syntaxhighlight>),所以还必须实现compareTo 方法; #: “getDelay”方法返回对象的残留延迟:负值表示延迟已经结束。('''元素只有在延迟用完的情况下才能从DelayQueue 移除''') * 其中均使用了“ReentrantLock”(域或局部变量): *: <syntaxhighlight lang="java"> private final ReentrantLock lock; </syntaxhighlight> * 其中均继承了“AbstractQueue<E>”、实现了“BlockingQueue<E>”和“java.io.Serializable”: *: <syntaxhighlight lang="java"> public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { </syntaxhighlight> * JavaSE 7 增加了一个TranSferQueUe 接口,允许生产者线程等待,直到消费者准备就绪可以接收一个元素。 *: 如果生产者调用<syntaxhighlight lang="java" inline>q.transfer(iteni);</syntaxhighlight>;这个调用会阻塞, 直到另一个线程将元素(item) 删除。 === 相关方法 === ===== java.util.concurrent.ArrayBlockingQueue<E> ===== * ArrayBlockingQueue( int capacity) * ArrayBlockingQueue( int capacity, boolean fair ) *: 构造一个带有指定的容量和公平性设置的阻塞队列。该队列用循环数组实现。 ===== java,util.concurrent.LinkedBlockingQueue<E> ===== ===== java.util.concurrent.LinkedBlockingDeque<E> ===== * LinkedBlockingQueue( ) * LinkedBlockingDeque( ) *: 构造一个无上限的阻塞队列或双向队列,用链表实现。 * LinkedBlockingQueue( int capacity ) * LinkedBlockingDeque( int capacity ) *: 根据指定容量构建一个有限的阻塞队列或双向队列,用链表实现。 ===== java.util.concurrent.DelayQueue<E extends Delayed> ===== * DelayQueue( ) *: 构造一个包含Delayed 元素的无界的阻塞时间有限的阻塞队列。只有那些延迟已经超过时间的元素可以从队列中移出。 ===== java.util.concurrent.Delayed ===== * long getDelay(Timellnit unit ) *: 得到该对象的延迟,用给定的时间单位进行度量。 ===== java.util.concurrent.PriorityBlockingQueue<E> ===== * PriorityBlockingQueue( ) * Priori tyBl ockingQueiie( int init i alCapaci ty) * PriorityBlockingQueue( int initialCapacity, Comparator ? super E> comparator ) *: 构造一个无边界阻塞优先队列,用堆实现。 *: 参数: initialCapacity 优先队列的初始容量。默认值是11。comparator 用来对元素进行比较的比较器,如果没有指定,则元素必须实现Comparable 接口。 ===== java.util.concurrent.BlockingQueue<E> ===== * void put ( E element ) *: 添加元素,在必要时阻塞。 * E take( ) *: 移除并返回头元素, 必要时阻塞。 * boolean offer ( E element , long time , Timellnit unit ) *: 添加给定的元素, 如果成功返回true, 如果必要时阻塞, 直至元素已经被添加或超时。 * E poll ( long time , Timellni t unit ) *: 移除并返回头元素, 必要时阻塞, 直至元素可用或超时用完。失败时返回null。 ===== java.util.concurrent.BiockingDeque<E> ===== * void putFirst( E element ) * void putLast( E element ) *: 添加元素,必要时阻塞。 * E takeFirst ( ) * E takeLast ( ) *: 移除并返回头元素或尾元素, 必要时阻塞。 * boolean offerFirst ( E element , long time , Timellni t unit ) * boolean offerLast ( E element , long time , TimeUnit unit ) *: 添加给定的元素, 成功时返回true, 必要时阻塞直至元素被添加或超时。 * E pol1First( 1 ong time , TimeUnit unit ) * E pol1Last ( 1 ong time , Timellnit unit ) *: 移动并返回头元素或尾元素,必要时阻塞, 直至元素可用或超时。失败时返回mill。 ===== java.util.concurrent.Transfer Queue<E> ===== * void transfer ( E element ) * boolean tryTransfer ( E element , long time , TimeUnit unit ) *: 传输一个值,或者尝试在给定的超时时间内传输这个值,这个调用将阻塞,直到另一个线程将元素删除。第二个方法会在调用成功时返回true。 == 线程安全的集合 == 如果多线程要并发地修改一个数据结构,那么很容易会破坏这个数据结构: : 可以通过提供锁来保护共享数据结构,但是选择线程安全的实现作为替代可能更容易些。 === 高效的 Map、Set 和 Queue === java.util.concurrent 包提供了映射、有序集和队列的高效实现:'''ConcurrentHashMap'''、'''ConcurrentSkipListMap'''、'''ConcurrentSkipListSet'''和'''ConcurrentLinkedQueue'''。<br/> * 这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分来使竞争极小化。 * 与大多数集合不同,size 方法不必在常量时间内操作。确定这样的集合当前的大小通常需要遍历。 ==== ConcurrentHashMap ==== <pre> 有些应用使用庞大的并发散列映射,这些映射太过庞大,以至于无法用size 方法得到它的大小,因为这个方法只能返回int。对于一个包含超过20 亿条目的映射该如何处理? JavaSE 8 引入了一个mappingCount 方法可以把大小作为long 返回。 </pre> * '''mappingCount''':用于返回映射数量(ConcurrentHashMap包含的映射可能多于可以表示为int的映射,所以不应该使用“size()”); *: 返回的值是一个估计值;如果存在并发的插入或删除操作,则实际计数可能会有所不同。 *: <syntaxhighlight lang="java"> /** * Returns the number of mappings. This method should be used * instead of {@link #size} because a ConcurrentHashMap may * contain more mappings than can be represented as an int. The * value returned is an estimate; the actual count may differ if * there are concurrent insertions or removals. * * @return the number of mappings * @since 1.8 */ public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values } </syntaxhighlight> <pre> 并发的散列映射表,可高效地支持大量的读者和一定数量的写者。默认情况下,假定可以有多达16 个写者线程同时执行。可以有更多的写者线程,但是,如果同一时间多于16个,其他线程将暂时被阻塞。可以指定更大数目的构造器,然而,恐怕没有这种必要。 散列映射将有相同散列码的所有条目放在同一个 桶”中。有些应用使用的散列函数不当,以至于所有条目最后都放在很少的桶中,这会严重降低性能。即使是一般意义上还算合理的散列函数,如String 类的散列函数,也可能存在问题。例如,攻击者可能会制造大量有相同散列值的字符串,让程序速度减慢。在JavaSE 8 中,并发散列映射将桶组织为树,而不是列表,键类型实现了Comparable,从而可以保证性能为O(log(n))。 </pre> * 在JavaSE 8 中,'''ConcurrentHashMap将桶组织为树,而不是列表''',键类型实现了“Comparable”,从而可以保证性能为O(log(n)); ==== 关于迭代器? ==== <pre> 集合返回弱一致性( weakly consistent) 的迭代器。这意味着迭代器不一定能反映出它们被构造之后的所有的修改, 但是, 它们不会将同一个值返回两次,也不会拋出ConcurrentModificationException 异常。 与之形成对照的是, 集合如果在迭代器构造之后发生改变,java.util 包中的迭代器将抛出一个ConcurrentModificationException 异常。 </pre> * 集合如果在迭代器构造之后发生改变,java.util 包中的迭代器将抛出一个'''ConcurrentModificationException'''异常。 【???】 ==== 相关方法 ==== ===== java.util.concurrent.ConcurrentLinkedQueue<E> ===== * ConcurrentLinkedQueue< E>() *: 构造一个可以被多线程安全访问的无边界非阻塞的队列。 ===== java.util.concurrent.ConcurrentLinkedQueue<E> ===== * ConcurrentSkipListSet<E>() * ConcurrentSkipListSet<E>(Comparator<? super E> comp) *: 构造一个可以被多线程安全访问的有序集。第一个构造器要求元素实现Comparable接口。 ===== java.util.concurrent.ConcurrentHashMap<K, V> ===== ===== java.util.concurrent.ConcurrentSkipListMap<K, V> ===== * ConcurrentHashMapCK, V>() * ConcurrentHashMap<K, V>(1nt 1n1t1 alCapacity) * ConcurrentHashMapCK, V>(int initialCapacity, float 1oadFactor, 1nt concurrencyLevel) *: 构造一个可以被多线程安全访问的散列映射表。 *: 参数: initialCapacity 集合的初始容量。默认值为16。 *: loadFactor 控制调整: 如果每一个桶的平均负载超过这个因子,表的大小会被重新调整。默认值为0.75。 *: concurrencyLevel 并发写者线程的估计数目。 * ConcurrentSkipListMap<K, V>() * ConcurrentSkipListSet<K, V>(Comparator<? super K> comp) *: 构造一个可以被多线程安全访问的有序的映像表。第一个构造器要求键实现Comparable 接口0 === Map 条目的原子更新 === * 线程安全:是指线程之间对同一个对象的访问不会出现同步错误; * 原子性:是指操作过程(在cpu、寄存器中执行时)不会被打断; 映射条目的原子更新,不仅讨论“ConcurrentHashMap”,其他基本的“Map”亦是如此。 ==== replace 方法更新 ==== 传统的做法是使用'''replace''' 操作: * 它会以原子方式用一个新值替换原值,前提是之前没有其他线程把原值替换为其他值。 *(使用do-while:必须一直这么做,直到replace 成功) <syntaxhighlight lang="java"> do { oldValue = map.get(word); newValue = oldValue = null ? 1 : oldValue + 1; } while (!map.replace(word, oldValue, newValue)); </syntaxhighlight> ==== 使用 AtomicLong 类型的Value ==== * '''AtomicLong'''和'''LongAdder''':数据类型类,其操作都是原子操作(都是“java.util.concurrent.atomic”包下的类); *:(区别于非原子操作的基本数据类型,如:“int”、“String”等) *:<syntaxhighlight lang="java"> public class AtomicLong extends Number implements java.io.Serializable { </syntaxhighlight> *:<syntaxhighlight lang="java"> public class LongAdder extends Striped64 implements Serializable { </syntaxhighlight> '''使用“AtomicLong”代替基本数据类型,作为“ConcurrentHashMap<K,V>”中 V 的类型''',则,对于数据的操作都是原子操作: # 即构造“ConcurrentHashMap<String,AtomicLong>”; # 或者在Java SE 8 中,还可以构造“ConcurrentHashMap<String,LongAdder>”; 如: <syntaxhighlight lang="java"> map.putlfAbsent(word, new LongAdder()); map.get(word).increment(); </syntaxhighlight> ==== 其他方法 ==== Java SE 8 提供了一些可以更方便地完成原子更新的方法: # compute:用于计算新值,需要提供一个键和一个计算新值的函数: #: <syntaxhighlight lang="java"> map.compute(word, (k, v) -> v = null ? 1: v + 1); </syntaxhighlight> # computelfPresent:已经有原值的情况下计算新值; # computelfAbsent:没有原值的情况下计算新值; * ConcurrentHashMap 中不'''允许有null 值'''。有很多方法都使用null 值来指示映射中某个给定的键不存在。 === ConcurrentHashMap 的批操作(search、reduce、forEach)=== Java SE 8 为 ConcurrentHashMap 提供了批操作,即使有其他线程在处理映射,这些操作也能安全地执行: * 批操作会遍历映射,处理遍历过程中找到的元素。 * 有3 种不同的操作: *# 搜索(search) :为每个键或值提供一个函数,直到函数生成一个非null 的结果。然后搜索终止,返回这个函数的结果。 *# 归约(reduce) :组合所有键或值, 这里要使用所提供的一个累加函数。 *# forEach:为所有键或值提供一个函数。 * 每个操作都有4 个版本: *# operationKeys:处理键。 *# operatioriValues:处理值。 *# operation:处理键和值。 *# operatioriEntries:处理Map.Entry 对象。 * 对于上述各个操作,需要指定一个参数化阈值(parallelism threshold):'''如果映射包含的元素多于这个阈值,就会并行完成批操作'''。 *# 如果希望批操作在一个线程中运行,可以使用阈值“Long.MAX_VALUE”; *# 如果希望用尽可能多的线程运行批操作,可以使用阈值“1”; * 对于int、long 和double 输出还有相应的特殊化操作, 分别有后缀“Tolnt”、“ToLong”和“ToDouble”。 *: 如:educeValuesToLong()、educeValuesTolnt() ==== 示例 ==== 如,“search”的操作: # U searchKeys(long threshold, BiFunction<? super K , ? extends U> f) # U searchValues(long threshold, BiFunction<? super V, ? extends U> f) # U search(long threshold, BiFunction<? super K, ? super V,? extends U> f) # U searchEntries(long threshold, BiFunction<Map.Entry<K, V>, ? extends U> f) 如,“search”的使用: “找出第一个出现次数超过1000 次的单词。需要搜索键和值” <syntaxhighlight lang="java"> String result = map.search(threshold, (k, v) -> v > 1000 ? k : null ); </syntaxhighlight> === 并发视图集 === <pre> 假设你想要的是一个大的线程安全的集而不是映射。并没有一个ConcurrentHashSet 类,而且你肯定不想自己创建这样一个类。当然,可以使用ConcurrentHashMap ( 包含“ 假” 值),不过这会得到一个映射而不是集, 而且不能应用Set 接口的操作。 静态newKeySet 方法会生成一个Set<K>, 这实际上是ConcurrentHashMap<K, Boolean〉的一个包装器。(所有映射值都为Boolean.TRUE, 不过因为只是要把它用作一个集, 所以并不关心具体的值。) Set<String> words = ConcurrentHashMap.<String>newKeySet() ; 当然, 如果原来有一个映射,keySet 方法可以生成这个映射的键集。这个集是可变的。如果删除这个集的元素,这个键(以及相应的值)会从映射中删除。不过, 不能向键集增加元素,因为没有相应的值可以增加。Java SE 8 为ConcurrentHashMap 增加了第二个keySet 方法,包含一个默认值,可以在为集增加元素时使用: Set<String> words = map.keySet (1L) ; words.add("]'ava”); 如果"Java” 在words 中不存在, 现在它会有一个值 1。 </pre> === 写数组的拷贝(CopyOnWriteArrayList、CopyOnWriteArraySet) === '''CopyOnWriteArrayList''' 和'''CopyOnWriteArraySet''' 是线程安全的集合,其中所有的修改线程对底层数组进行复制。 当构建一个迭代器的时候, 它包含一个对当前数组的引用。 如果数组后来被修改了,迭代器仍然引用旧数组, 但是,集合的数组已经被替换了。因而,旧的迭代器拥有一致的(可能过时的)视图,访问它无须任何同步开销。 * 即:'''迭代器对原集合进行访问,而copy一份数组进行修改操作,在执行完修改操作后将原来集合指向新的集合来完成修改操作'''; * '''加了写锁的ArrayList和ArraySet,锁住的是整个对象,但读操作可以并发执行''' === 并行数组算法 === 在Java SE 8 中, Arrays 类提供了大量并行化操作。 如,静态Arrays.parallelSort 方法可以对一个基本类型值或对象的数组排序。 <syntaxhighlight lang="java"> String contents = new String(Fi1es.readAl1Bytes(Paths.get("alice.txt")), StandardCharsets.UTFJ); // Read file into string String[] words = contents.split("[\\P{L}]+"); // Split along nonletters Arrays,parallelSort(words): </syntaxhighlight> === 较早的线程安全集合 === 从Java 的初始版本开始,'''Vector''' 和'''Hashtable''' 类就提供了线程安全的动态数组和散列表的实现。 现在这些类被弃用了, 取而代之的是AnayList 和HashMap 类。这些类不是线程安全的,而集合库中提供了不同的机制。 任何集合类都可以通过使用同步包装器( '''synchronizationwrapper''') 变成线程安全的: <syntaxhighlight lang="java"> List<E> synchArrayList = Col lections ,synch ronizedList (new ArrayList<E>()) ; Map<K, V> synchHashMap = Col1ections.synchroni zedMap(new HashMap<K, V>()); </syntaxhighlight> 结果集合的方法使用锁加以保护, 提供了线程安全访问。 * 最好使用“java.Util.Concurrent”包中定义的集合,不使用同步包装器中的。特别是,假如它们访问的是不同的桶,由于ConcurrentHashMap 已经精心地实现了, 多线程可以访问它而且不会彼此阻塞。有一个例外是经常被修改的数组列表。在那种情况下, 同步的ArrayList 可以胜过CopyOnWriteArrayList()。 * 参见'''[[集合与同步集合的比较]]''' ==== 相关方法 ==== ===== java.util.CoIlections ===== * static <E> Collect1 on<E> synchronizedCollection(Collection<E> c) * static <E> List synchronizedList(List<E> c) * static <E> Set synchronizedSet(Set< E> c) * static <E> SortedSet synchronizedSortedSet(SortedSet <E> c) * static <K, V > Map<K, V > synchronizedMap(Map<K, V > c) * static <K, V> SortedMap<K, V> synchronizedSortedMap(SortedMap<K, V> c) *: 构建集合视图, 该集合的方法是同步的。 == 异步运行接口(Callable 与 Future)== * '''Runnable''':封装一个异步运行的任务,可以把它想象成为一个没有参数和返回值的异步方法。 *: <syntaxhighlight lang="java"> package java.lang; @FunctionalInterface public interface Runnable { public abstract void run(); } </syntaxhighlight> * '''Callable''':与Runnable 类似,但是有返回值。 *: <syntaxhighlight lang="java"> package java.util.concurrent; @FunctionalInterface public interface Callable<V> { V call() throws Exception; } </syntaxhighlight> ** 类型参数是返回值的类型。如<syntaxhighlight lang="java" inline>Callable<Integer></syntaxhighlight>表示一个最终返回Integer对象的异步计算。 * '''Future''':保存异步计算的结果; *: <syntaxhighlight lang="java"> public interface Future<V> { V get() throws . .; V get(long timeout, TimeUnit unit) throws . .; void cancel (boolean maylnterrupt); boolean isCancelled(); boolean isDone(); } </syntaxhighlight> * '''FutureTask''':包装器,用于将“Callable”转换成“Future”和“Runnable”; *: <syntaxhighlight lang="java"> public class FutureTask<V> implements RunnableFuture<V> { </syntaxhighlight> *: FutureTask 实现了“RunnableFuture<V>”接口,而“RunnableFuture<V>”同时实现了“Runnable”和“Future<V>”接口; 示例: <syntaxhighlight lang="java"> Callable<Integer> myComputation = . . .; FutureTask<Integer> task = new FutureTask<Integer>(myConiputation); Thread t = new Thread(task); // it's a Runnable t.start(); Integer result = task.get(); // it's a Future </syntaxhighlight> === 相关方法 === ===== java.util.concurrent.Callable<V> ===== * V call() *: 运行一个将产生结果的任务。 ===== java.util.concurrent.Future<V> ===== * V get() * V get(1ong time, TimeUnit unit) *: 获取结果,如果没有结果可用,则阻塞直到真正得到结果超过指定的时间为止。如果不成功, 第二个方法会拋出'''TimeoutException''' 异常。 *: 如果运行该计算的线程被中断,两个方法都将拋出'''InterruptedException'''。 * boolean cancel(boolean maylnterrupt) *: 尝试取消这一任务的运行。如果任务已经开始, 并且maylnterrupt 参数值为true, 它就会被中断。如果成功执行了取消操作, 返回true。 * boolean isCancelled() *: 如果任务在完成前被取消了, 则返回true。 * boolean isDone() *: 如果任务结束,无论是正常结束、中途取消或发生异常, 都返回true。 ===== java.util.concurrent.FutureTask<V> ===== * FutureTask(Cal1able< V > task) * FutureTask(Runnable task, V result) *: 构造一个既是Future<V> 又是Runnable 的对象。 == 执行器 == == 同步器 == == 线程与Swing ==
返回至“
核心技术:并发
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息