多线程
线程状态:
wait和sleep:
- wait()属于Object方法,调用之后会强制释放当前对象锁,调用时必须拿到当前对象的监视器monitor对象,所以必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求。
- wait 方法会主动释放 monitor 锁,在同步代码中执行 sleep 方法时,并不会释放 monitor 锁。
- wait 方法意味着永久等待,直到被中断或被唤醒才能恢复,不会主动恢复,sleep 方法中会定义一个时间,时间到期后会主动恢复。
- wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法。

怎么创建线程:
- 继承Thread类,重写run方法,并通过调用start方法启动线程。
- 实现Runnable接口,并重写run方法。
- 使用匿名内部类,或者Lambda表达式,适用于简单的任务。
- 使用线程池,当然不一定会创建线程。
class MyThread extends Thread {
public void run() {
// 线程任务逻辑
}
}
// 创建并启动线程
MyThread myThread = new MyThread();
myThread.start();
//Runnable
class MyRunnable implements Runnable {
public void run() {
// 线程任务逻辑
}
}
//Callable
MyCallable myCallable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(myCallable);
Thread thread = new Thread(futureTask);
futureTask.get();
//Runnable
Thread thread = new Thread(new MyRunnable());
thread.start();
//匿名内部类
Thread thread = new Thread(new Runnable(){...run()});
//Lambda表达式
Thread thread = new Thread(()->{});
//线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.submit(()->{});
怎么停止线程:
- 使用原子标志位,将逻辑嵌入业务代码。
- 使用
interrupt
方法,在线程中检测Thread.currentThread().isInterrupted()
进行处理。 - 使用
stop
方法,不推荐,会导致强制停止,可能造成资源泄露。(会释放synchronized而不会自动缩放ReentranLock) - 如果是线程池,可以使用ExecutorService的
shutdown
或者shutdownNow
。 - 如果线程通过
Future
启动,可以使用cancel
方法取消执行。
线程池:
参数:
参数名称 | 描述 |
---|---|
corePoolSize | 线程池中的核心线程数。即使线程处于空闲状态,也会保持在线程池中,除非设置了allowCoreThreadTimeOut。(核心数*(1+等待/运行)) |
maximumPoolSize | 线程池中允许的最大线程数。当队列满了并且正在运行的线程数小于maximumPoolSize时,会创建新的线程。(可以设置压测值) |
keepAliveTime | 当线程数大于核心线程数时,多余的空闲线程在这个时间内没有任务可执行就会被终止。 |
unit | keepAliveTime的时间单位,如秒、分钟等。 |
workQueue | 用于保存等待执行的任务的阻塞队列。 |
threadFactory | 每个线程创建都会调用的方法,可以用来设置线程的一些属性,比如是否是守护线程等。 |
handler | 当线程和队列都满了之后,新提交的任务会触发这个拒绝策略处理器。 |
常用线程池:
线程池类型 | 使用场景 | 优点 | 缺点 | 参数 |
---|---|---|---|---|
FixedThreadPool | 适用于需要限定并发线程数量的场景,保证了任何时候最多有固定数量的线程同时运行。 | - 控制资源消耗; - 提供了对线程数量的精确控制。 | 不适合处理大量耗时的任务,因为可能会导致内存溢出。 | 允许队列长度为max int |
CachedThreadPool | 适用于执行很多短期异步任务的小程序,或负载较轻的服务器。 | - 自动回收旧的线程并创建新的线程; - 几乎无限制地创建新线程(注意可能导致内存溢出)。 | 可能会创建大量的线程,导致系统资源耗尽。 | 允许创建线程数为max int |
ScheduledThreadPool | 适用于需要定时或周期性执行任务的场景。 | - 支持定时及周期性任务执行; - 能够灵活配置核心线程数。 | 如果任务执行时间比计划间隔长,则会导致任务积压。 | 允许创建线程数为max int |
SingleThreadExecutor | 适用于需要顺序执行每个任务,并且在任意时间点都不会有多个线程活动的场景。 | - 全部任务都在一个线程中按顺序执行,避免了上下文切换的开销。 | 性能瓶颈明显,因为所有任务必须排队等待前一个任务完成。 | 允许队列长度为max int |
线程池状态:
状态 | 处理 |
---|---|
RUNNING | 会接收新任务并且会处理队列中的任务 |
SHUTDOWN | 不会接收新任务并且会处理队列中的任务,任务处理完后会中断所有线程 |
STOP | 不会接收新任务并且不会处理队列中的任务,并且会直接中断所有线程 |
TIDYING | 所有线程都停止了之后,线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated() |
TERMINATED | terminated()执行完之后就会转变为TERMINATED |
转变前 | 转变后 | 转变条件 |
---|---|---|
RUNNING | SHUTDOWN | 手动调用shutdown()触发,或者线程池对象GC时会调用finalize()从而调用shutdown() |
RUNNING | STOP | 手动调用shutdownNow()触发 |
SHUTDOWN | STOP | 手动先调用shutdown()紧着调用shutdownNow()触发 |
SHUTDOWN | TIDYING | 线程池所有线程都停止后自动触发 |
STOP | TIDYING | 线程池所有线程都停止后自动触发 |
TIDYING | TERMINATED | 线程池自动调用terminated()后触发 |
拒绝策略:
- AbortPolicy:直接抛出异常,阻止系统正常运行。可以根据业务逻辑选择重试或者放弃提交等策略。
- CallerRunsPolicy :只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。不会造成任务丢失,同时减缓提交任务的速度,给执行任务缓冲时间。
- DiscardOldestPolicy :丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务。
- DiscardPolicy :该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
线程池流程:
- 在使用execute()方法提交一个Runnable对象时
- 会先判断当前线程池中的线程数是否小于corePoolSize
- 如果小于,则创建新线程并执行Runnable
- 如果大于等于,则尝试将Runnable加入到workQueue中
- 如果workQueue没满,则将Runnable正常入队,等待执行
- 如果workQueue满了,则会入队失败,那么会尝试继续增加线程
- 如果当前线程池中的线程数是否小于maximumPoolSize
- 如果小于,则创建新线程并执行任务(这里是非公平的,可能比队列中任务先执行)
- 如果大于等于,则执行拒绝策略,拒绝此Runnable
- 【tips:tomcat自己的线程池会优先启动线程,上限再将任务入队】
- 【tips:execute执行,异常会移除异常线程并新建,submit是不会移除也不会创建】
线程池是否完成:
- isTerminated:操作,执行
pool.shutdown
后判断pool.isTerminated
是否成功,操作简单,但是需要有关闭线程池的操作。 - getCompletedTaskCount:使用
getTaskCount()
获取线程池中已经完成和正在执行的任务数量,getCompletedTaskCount
返回以及执行完成的任务总数(这两个方法返回的都是近似的,因为任务和线程状态可能在计算中动态改变),这种方式不需要关闭线程池,但是需要确保不会有新任务进行提交。 - CountDownLatch:需要在线程池任务中执行
CountDownLatch.countDown()
方法对计数器减一,随后调用CountDownLatch.await()
方法阻塞直到完成,这种方式需要提前知道线程数,性能较差,还需要任务加上异常判断,否则可能导致计数器无法完成闭环导致阻塞。 - 计数器:类似CountDownLatch,自己维护一个原子变量,或者使用锁进行计数器操作。
- Future:使用Future获取线程池的任务状态,调用
future.isDone()
判断是否完成,需要每个线程池都关联一个future对象。
Fork/Join:
设计目的和策略
- Fork/Join框架:主要目的是通过并行化处理来加速计算密集型任务的执行。它采用分治法将大任务递归分解为更小的任务,直至这些子任务足够简单可以直接解决。这种策略特别适合那些可以自然分割的问题,如数组排序或搜索等。
- 传统线程池:旨在提供一种有效管理线程生命周期的方法,减少频繁创建和销毁线程带来的开销,并且可以根据需要灵活调整并发级别。它可以适应多种不同类型的任务,无论是计算密集型还是I/O密集型。
任务分配和执行
Fork/Join框架:
- 使用
ForkJoinPool
作为其执行环境。 - 支持工作窃取算法,使得空闲线程可以从其他忙碌线程的工作队列中“偷取”任务执行,以此平衡负载。
- 自动管理任务的分裂(fork)与合并(join),简化了开发者的负担。
- 使用
传统线程池:
- 通常遵循先进先出(FIFO)原则来调度任务。
- 开发者需手动管理任务的划分与协调,增加了复杂度但提供了更大的灵活性。
集成与扩展性
- Fork/Join框架:内置了对阻塞队列的支持,能够自动处理任务队列的管理和线程的阻塞等待,这有助于提高效率和简化编程模型。
- 传统线程池:虽然也支持使用阻塞队列,但这往往需要开发者根据具体需求进行配置或选择合适的第三方库实现,提供了更多的定制可能性但也要求更高的专业知识。
适用场景
- Fork/Join框架:最适合于那些可以通过递归分解为独立子问题的任务,尤其是当任务之间几乎没有依赖关系时。例如,在数据处理、图像渲染等领域表现出色。
- 传统线程池:适用于广泛的场景,包括但不限于计算密集型任务、I/O密集型任务、定时任务等。对于不需要或不适合细粒度并行化的任务,传统线程池可能是更好的选择。
ThreadLocal:
ThreadLocal 是一个泛型类,保证可以接受任何类型的对象,最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是ThreadLocalMap的封装,传递了变量值。
- ThreadLocal是Java中所提供的线程本地存储机制,可以利用该机制将数据缓存在某个线程内部,该线程可以在任意时刻、任意方法中获取缓存的数据。
- ThreadLocal底层是通过ThreadLocalMap来实现的,每个Thread对象(注意不是ThreadLocal对象)中都存在一个ThreadLocalMap,Map的key为ThreadLocal对象,Map的value为需要缓存的值。
- 在线程池中使用ThreadLocal可能会造成内存泄漏,而线程对象是通过强引用指向ThreadLocalMap,ThreadLocalMap也是通过强引用指向Entry对象,线程不被回收,Entry对象也就不会被回收,从而出现内存泄漏,解决办法是,在使用了ThreadLocal对象之后,手动调用ThreadLocal的remove方法,手动清楚Entry对象。
内存泄露:
ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,⽽ value 是强引⽤。弱引用的特点是,如果这个对象持有弱引用,那么在下一次垃圾回收的时候必然会被清理掉。【如果key设计为强引用,问题会更严重,只要ThreadLocalMap
还存在对ThreadLocal
实例的引用,即使应用程序代码中已经不再使用该ThreadLocal
实例,它也不会被垃圾回收器回收。】
ThreadLocalMap实现中已经考虑了这种情况,在调用 set()、get()、remove() 方法的时候,会清理掉 key 为 null 的记录。如果说会出现内存泄漏,那只有在出现了 key 为 null 的记录后,没有手动调用 remove() 方法,并且之后也不再调用 get()、set()、remove() 方法的情况下。因此使用完ThreadLocal 方法后,最好手动调用 remove() 方法。
CompletableFuture:
CompletableFuture实现了Future和CompletionStage接口,所以CompletableFuture首先是一个Future,除开拥有Future的功能外,还拥有CompletionStage接口所提供的功能,而CompletionStage表示异步执行整个过程中的某个步骤,表示我们可以利用CompletableFuture来对异步执行的多个任务进行编排(这是Future所不支持的)。
//定义
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {}
//创建
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
@FunctionalInterface
public interface Supplier<T> {
T get();
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//示例
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Supplier<String> taskA = () -> {
System.out.println("1:" + Thread.currentThread().getName());
return "Hello";
};
Function<String, String> taskB = s -> {
System.out.println("2:" + Thread.currentThread().getName());
return s + " World";
};
CompletableFuture.supplyAsync(taskA, executorService)
.thenApply(taskB);
}
//or
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return 42;
});
// 非阻塞链式操作
CompletableFuture<Void> resultFuture = future.thenApply(result -> {
System.out.println("Result: " + result);
return result * 2;
}).thenAccept(result -> System.out.println("Transformed Result: " + result));
resultFuture.get(); // 等待所有阶段完成
}
}
线程和进程:
特性 | 进程 | 线程 |
---|---|---|
资源管理 | 拥有独立的资源,如内存空间、文件描述符等。 | 共享所属进程的资源,线程间可以通过全局变量等方式直接通信。 |
开销 | 创建和销毁的开销大,进程间的切换也需要较多的时间和资源。 | 相对于进程,创建和销毁的开销较小,线程间的切换也更加高效。 |
内存空间 | 每个进程都有自己独立的地址空间。 | 同一进程内的所有线程共享同一个地址空间。 |
安全性 | 进程间相互隔离,安全性高。 | 因为共享资源,所以可能存在竞争条件,需要同步机制来保证数据一致性。 |
通信方式 | 进程间通信(IPC)较为复杂,通常使用管道、消息队列、信号量等方式。 | 线程间可以直接访问同一进程内的任何数据,通信更为直接。 |
故障影响 | 一个进程崩溃不会直接影响其他进程。 | 一个线程如果出现致命错误可能导致整个进程崩溃。 |